KMR
kmrmoreops.c
Go to the documentation of this file.
1 /* kmrmoreops.c (2014-02-04) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrmoreops.c More Operatons on Key-Value Stream. */
5 
6 #include <mpi.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <limits.h>
11 #include <errno.h>
12 #include <assert.h>
13 //#define __XPG4_CHAR_CLASS__
14 //#include <ctype.h>
15 #include "kmr.h"
16 #include "kmrimpl.h"
17 
18 #define MIN(a,b) (((a)<(b))?(a):(b))
19 #define MAX(a,b) (((a)>(b))?(a):(b))
20 #define NEVERHERE 0
21 
22 static int
23 kmr_find_key_fn(const struct kmr_kv_box kv,
24  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
25 {
26  const struct kmr_kv_box *p0 = p;
27  const struct kmr_kv_box kv0 = *p0;
28  kmr_sorter_t cmp = kmr_choose_sorter(kvs);
29  if (cmp(&kv0, &kv) == 0) {
30  kmr_add_kv(kvo, kv);
31  }
32  return MPI_SUCCESS;
33 }
34 
35 /** Finds a key-value pair for a key. It is an error when not exactly
36  one entry is found. It does not consume the input KVS KVI. The
37  returned key-value entry must be used before freeing the input
38  KVS, when it points to an opaque data. It maps internally, so it
39  is slow. It is tricky that the internally created KVS KVS0 points
40  to the key-value area in the input KVS KVI. */
41 
42 int
43 kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *ko)
44 {
45  enum kmr_kv_field keyf = kmr_unit_sized_with_unmanaged(kvi->c.key_data);
46  enum kmr_kv_field valf = kmr_unit_sized_with_unmanaged(kvi->c.value_data);
47  KMR_KVS *kvs0 = kmr_create_kvs(kvi->c.mr, keyf, valf);
48  struct kmr_option insepct = {.inspect = 1};
49  int cc = kmr_map(kvi, kvs0, &ki, insepct, kmr_find_key_fn);
50  if (kvs0->c.element_count == 1) {
51  cc = kmr_take_one(kvs0, ko);
52  assert(cc == MPI_SUCCESS);
53  } else {
54  cc = ((kvs0->c.element_count == 0) ? MPI_ERR_ARG : MPI_ERR_COUNT);
55  {
56  KMR *mr = kvi->c.mr;
57  char ee[80];
58  snprintf(ee, 80, "kmr_find_key with not one key (keys=%ld)",
59  kvs0->c.element_count);
60  kmr_error(mr, ee);
61  }
62  }
63  kmr_free_kvs(kvs0);
64  return cc;
65 }
66 
67 /** Finds the key K in the key-value stream KVS. It returns a pointer
68  pointing inside the key-value stream. It is an error when not
69  exactly one entry is found. It does not consume the input KVS.
70  It maps internally, so slow. */
71 
72 int
73 kmr_find_string(KMR_KVS *kvi, const char *k, const char **vq)
74 {
75  assert(k != 0 && vq != 0);
76  assert(kvi->c.key_data == KMR_KV_OPAQUE
77  || kvi->c.key_data == KMR_KV_CSTRING);
78  int klen = ((int)strlen(k) + 1);
79  struct kmr_kv_box ki = {.klen = klen, .k.p = k, .vlen = 0, .v.p = 0};
80  struct kmr_kv_box vo;
81  int cc = kmr_find_key(kvi, ki, &vo);
82  if (cc != MPI_SUCCESS) {
83  if (cc == MPI_ERR_ARG) {
84  kmr_warning(kvi->c.mr, 1, "kmr_find_key no key found");
85  } else {
86  kmr_warning(kvi->c.mr, 1, "kmr_find_key multiple keys found");
87  }
88  }
89  *vq = vo.v.p;
90  return cc;
91 }
92 
93 /* Totals the collected element-counts from all ranks. It is called
94  once with as many pairs as the number of ranks. */
95 
96 static int
97 kmr_get_element_count_fn(const struct kmr_kv_box kv[], const long n,
98  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
99 {
100  long *cntq = p;
101  assert(kvo == 0 && kvs->c.mr->nprocs == n && *cntq == 0);
102  long cnt = 0;
103  for (int i = 0; i < n; i++) {
104  cnt += kv[i].v.i;
105  }
106  *cntq = cnt;
107  return MPI_SUCCESS;
108 }
109 
110 /** Gets the total number of key-value pairs. It uses replication and
111  reduction. */
112 
113 int
115 {
116  kmr_assert_kvs_ok(kvs, 0, 1, 0);
117  assert(v != 0);
118  int cc;
119  KMR *mr = kvs->c.mr;
120  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
121  struct kmr_kv_box kv = {.klen = (int)sizeof(long),
122  .vlen = (int)sizeof(long),
123  .k.i = 0,
124  .v.i = kvs->c.element_count};
125  cc = kmr_add_kv(kvs0, kv);
126  assert(cc == MPI_SUCCESS);
127  kmr_add_kv_done(kvs0);
128  /* Replicate and reduce to get a total. */
129  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
130  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
131  assert(cc == MPI_SUCCESS);
132  long cnt = 0;
133  cc = kmr_reduce(kvs1, 0, &cnt, kmr_noopt, kmr_get_element_count_fn);
134  assert(cc == MPI_SUCCESS);
135  *v = cnt;
136  return MPI_SUCCESS;
137 }
138 
139 /* ================================================================ */
140 
141 int
142 kmr_reverse_fn(const struct kmr_kv_box kv,
143  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
144 {
145  struct kmr_kv_box nkv = {.klen = kv.vlen,
146  .vlen = kv.klen,
147  .k = kv.v,
148  .v = kv.k};
149  kmr_add_kv(kvo, nkv);
150  return MPI_SUCCESS;
151 }
152 
153 /** Makes a new pair by swapping the key and the value in each pair.
154  That is, it makes new pairs (v0,k0) from (k0,v0). This is a
155  simple mapper. Effective-options: NOTHREADING, INSPECT,
156  KEEP_OPEN, TAKE_CKPT. See struct kmr_option. */
157 
158 int
159 kmr_reverse(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
160 {
161  return kmr_map(kvi, kvo, 0, opt, kmr_reverse_fn);
162 }
163 
164 /* Adds a pairing of a key-value KV under a given key. Field
165  data-types are for the old key and the value. */
166 
167 static inline int
168 kmr_add_pairing_under_key(KMR_KVS *kvo, int klen, union kmr_unit_sized k,
169  const struct kmr_kv_box kv,
170  enum kmr_kv_field keyf, enum kmr_kv_field valf)
171 {
172  assert(kvo->c.value_data == KMR_KV_OPAQUE
173  || kvo->c.value_data == KMR_KV_CSTRING);
174  size_t sz = kmr_kvs_entry_netsize_of_box(kv);
175  assert(kmr_check_alignment(sz));
176  char buf[10 * 1024];
177  struct kmr_kvs_entry *e;
178  if (sz <= sizeof(buf)) {
179  e = (void *)buf;
180  } else {
181  e = kmr_malloc(sz);
182  }
183  kmr_poke_kv2(e, kv, 0, keyf, valf, 0);
184  struct kmr_kv_box kv1 = {.klen = klen,
185  .vlen = (int)sz,
186  .k = k,
187  .v.p = (void *)e};
188  kmr_add_kv(kvo, kv1);
189  if (e != (void *)buf) {
190  kmr_free(e, sz);
191  }
192  return MPI_SUCCESS;
193 }
194 
195 int
196 kmr_pairing_fn(const struct kmr_kv_box kv,
197  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
198 {
199  int cc;
200  cc = kmr_add_pairing_under_key(kvo, kv.klen, kv.k, kv,
201  kvi->c.key_data, kvi->c.value_data);
202  assert(cc == MPI_SUCCESS);
203  return MPI_SUCCESS;
204 }
205 
206 /** Replaces a value part with a key-value pairing. That is, it makes
207  new pairs (k0,(k0,v0)) from (k0,v0). See kmr_unpairing(). This
208  is a simple mapper. Effective-options: NOTHREADING, INSPECT,
209  KEEP_OPEN, TAKE_CKPT. See struct kmr_option. */
210 
211 int
212 kmr_pairing(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
213 {
214  return kmr_map(kvi, kvo, 0, opt, kmr_pairing_fn);
215 }
216 
217 int
218 kmr_unpairing_fn(const struct kmr_kv_box kv,
219  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
220 {
221  struct kmr_kvs_entry *e = (void *)kv.v.p;
222  struct kmr_kv_box nkv = kmr_pick_kv(e, kvo);
223  kmr_add_kv(kvo, nkv);
224  return MPI_SUCCESS;
225 }
226 
227 /** Extracts a key-value pair from a pairing in the value part,
228  discarding the original key. It is the inverse of kmr_pairing.
229  That is, it makes new pairs (k1,v1) from (k0,(k1,v1)). See
230  kmr_pairing(). This is a simple mapper. Effective-options:
231  NOTHREADING, INSPECT, KEEP_OPEN, TAKE_CKPT. See struct kmr_option. */
232 
233 int
234 kmr_unpairing(KMR_KVS *kvs, KMR_KVS *kvo, struct kmr_option opt)
235 {
236  return kmr_map(kvs, kvo, 0, opt, kmr_unpairing_fn);
237 }
238 
239 /* Packs two integers into an array. */
240 
241 #if 0
242 static int
243 kmr_pack2_fn(const struct kmr_kv_box kv,
244  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, const long i)
245 {
246  assert(kvo == 0 && p != 0);
247  long *samples2 = p;
248  int rank = kv.k.i;
249  long *mm = (void *)kv.v.p;
250  long *ss = &samples2[2 * rank];
251  ss[0] = mm[0];
252  ss[1] = mm[1];
253  return MPI_SUCCESS;
254 }
255 #endif
256 
257 /* Assigns destination ranks for integer keys. */
258 
259 static int
260 kmr_partition_by_ranking_fn(const struct kmr_kv_box kv,
261  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
262  const long i)
263 {
264  int cc;
265  KMR *mr = kvi->c.mr;
266  int nprocs = mr->nprocs;
267  long *range = p;
268  long minkey = range[0];
269  long maxkey = range[1];
270  long v = kmr_stable_key(kv, kvi);
271  assert(minkey <= v && v < (maxkey + 1));
272  long d = (((maxkey + 1) - minkey + nprocs - 1) / nprocs);
273  int rank = (int)((v - minkey) / d);
274  assert(0 <= rank && rank < nprocs);
275  union kmr_unit_sized k = {.i = rank};
276  cc = kmr_add_pairing_under_key(kvo, sizeof(long), k, kv,
277  kvi->c.key_data, kvi->c.value_data);
278  assert(cc == MPI_SUCCESS);
279  return MPI_SUCCESS;
280 }
281 
282 /* Puts a pairing of a key-value under a rank, where a rank is a sort
283  bucket, a pairing is an original key-value pair. A proper rank is
284  searched in the partition of the key range. */
285 
286 static int
287 kmr_rank_for_sort(const struct kmr_kv_box kv,
288  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
289 {
290  struct kmr_kv_box *bv = p;
291  KMR *mr = kvi->c.mr;
292  int nprocs = mr->nprocs;
293  kmr_sorter_t cmp = kmr_choose_sorter(kvi);
294  struct kmr_kv_box *q = kmr_bsearch(&kv, bv, (size_t)(nprocs - 1),
295  sizeof(struct kmr_kv_box),
296  (kmr_qsorter_t)cmp);
297  long rank = (q - bv);
298  assert(0 <= rank && rank < nprocs);
299  union kmr_unit_sized k = {.i = rank};
300  kmr_add_pairing_under_key(kvo, sizeof(long), k, kv,
301  kvi->c.key_data, kvi->c.value_data);
302  return MPI_SUCCESS;
303 }
304 
305 /* Samples key-value pairs, collecting one at index zero and the
306  following by a fixed stride. NSAMPLES be positive. It does not
307  consume the input as INSPECT is asserted. */
308 
309 static int
310 kmr_sample_kv(long nsamples, KMR_KVS *kvi, KMR_KVS *kvo)
311 {
312  assert(nsamples >= 0);
313  if (nsamples > 0) {
314  struct kmr_option inspect = {.inspect = 1};
315  long cnt = kvi->c.element_count;
316  long stride = ((cnt < nsamples) ? 1 : (cnt / nsamples));
317  long limit = (stride * nsamples);
318  int cc = kmr_map_skipping(0, stride, limit, 0, kvi, kvo,
319  0, inspect, kmr_add_identity_fn);
320  assert(cc == MPI_SUCCESS);
321  assert(kvo->c.element_count == MIN(cnt, nsamples));
322  } else {
323  int cc = kmr_add_kv_done(kvo);
324  assert(cc == MPI_SUCCESS);
325  }
326  return MPI_SUCCESS;
327 }
328 
329 static int
330 kmr_sample_to_array(long nsamples, KMR_KVS *kvi, struct kmr_kv_box *bv)
331 {
332  assert(nsamples >= 0);
333  if (nsamples > 0) {
334  int cc;
335  long cnt = kvi->c.element_count;
336  assert(cnt >= nsamples);
337  long start, stride, limit;
338  if (cnt == nsamples) {
339  stride = 1;
340  start = 0;
341  limit = nsamples;
342  } else {
343  stride = (cnt / (nsamples + 1));
344  start = stride;
345  limit = (start + (stride * nsamples));
346  }
347  struct kmr_option inspect = {.inspect = 1};
348  cc = kmr_map_skipping(start, stride, limit, 0, kvi, 0,
349  bv, inspect, kmr_copy_to_array_fn);
350  assert(cc == MPI_SUCCESS);
351  }
352  return MPI_SUCCESS;
353 }
354 
355 /* Packs two integers into an array. */
356 
357 static int
358 kmr_minmax2_fn(const struct kmr_kv_box kv[], const long n,
359  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
360 {
361  long *range = p;
362  long range0 = LONG_MAX;
363  long range1 = LONG_MIN;
364  for (int i = 0; i < n; i++) {
365  long *mm = (void *)kv[i].v.p;
366  long m0 = mm[0];
367  long m1 = mm[1];
368  if (m0 < range0) {
369  range0 = m0;
370  }
371  if (m1 > range1) {
372  range1 = m1;
373  }
374  }
375  range[0] = range0;
376  range[1] = range1;
377  return MPI_SUCCESS;
378 }
379 
380 /** Sorts a key-value stream, by partitioning to equal ranges. It is
381  NOT-STABLE due to quick-sort used inside. It consumes an input
382  key-value stream unless INSPECT is specified. It assumes uniform
383  distribution, and partioning is simply determined by the range of
384  keys (MIN-MAX range is divided by nprocs). Effective-options:
385  NOTHREADING, INSPECT. See struct kmr_option. */
386 
387 int
388 kmr_sort_small(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
389 {
390  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
391  assert(kmr_shuffle_compatible_p(kvo, kvi));
392  KMR *mr = kvi->c.mr;
393  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1};
394  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
395  int cc;
396  //int nprocs = mr->nprocs;
397  int rank = mr->rank;
398  struct kmr_option i_opt = kmr_copy_options_i_part(opt);
399  struct kmr_option o_opt = kmr_copy_options_o_part(opt);
400  struct kmr_option m_opt = kmr_copy_options_m_part(opt);
401  enum kmr_kv_field keyf = kvi->c.key_data;
402  enum kmr_kv_field valf = kvi->c.value_data;
403  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
404  cc = kmr_sort_locally(kvi, kvs1, 0, i_opt);
405  assert(cc == MPI_SUCCESS);
406  /* Find sort key range (collect min/max keys). */
407  long mm[2];
408  long cnt = kvs1->c.element_count;
409  if (cnt == 0) {
410  mm[0] = 0;
411  mm[1] = 0;
412  } else if (cnt == 1) {
413  struct kmr_kvs_entry *
414  e0 = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
415  struct kmr_kv_box b0 = kmr_pick_kv(e0, kvs1);
416  long v = kmr_stable_key(b0, kvs1);
417  mm[0] = v;
418  mm[1] = v;
419  } else {
420  struct kmr_kvs_entry *
421  e0 = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
422  struct kmr_kvs_entry *e1 = kmr_find_kvs_last_entry(kvs1);
423  struct kmr_kv_box b0 = kmr_pick_kv(e0, kvs1);
424  struct kmr_kv_box b1 = kmr_pick_kv(e1, kvs1);
425  mm[0] = kmr_stable_key(b0, kvs1);
426  mm[1] = kmr_stable_key(b1, kvs1);
427  }
428  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
429  struct kmr_kv_box kv = {
430  .klen = (int)sizeof(long),
431  .vlen = (int)sizeof(long[2]),
432  .k.i = rank,
433  .v.p = (void *)mm
434  };
435  cc = kmr_add_kv(kvs2, kv);
436  assert(cc == MPI_SUCCESS);
437  cc = kmr_add_kv_done(kvs2);
438  assert(cc == MPI_SUCCESS);
439  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
440  cc = kmr_replicate(kvs2, kvs3, kmr_noopt);
441  assert(cc == MPI_SUCCESS);
442  long range[2] = {LONG_MAX, LONG_MIN};
443  cc = kmr_reduce_as_one(kvs3, 0, range, m_opt, kmr_minmax2_fn);
444  assert(cc == MPI_SUCCESS);
445  /* Partiton the array by destination ranks. */
446  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
447  cc = kmr_map(kvs1, kvs4, range, m_opt, kmr_partition_by_ranking_fn);
448  assert(cc == MPI_SUCCESS);
449  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
450  struct kmr_option ranking = {.key_as_rank = 1};
451  cc = kmr_shuffle(kvs4, kvs5, ranking);
452  assert(cc == MPI_SUCCESS);
453  KMR_KVS *kvs6 = kmr_create_kvs(mr, keyf, valf);
454  cc = kmr_unpairing(kvs5, kvs6, m_opt);
455  assert(cc == MPI_SUCCESS);
456  /* Locally sort. */
457  cc = kmr_sort_locally(kvs6, kvo, 0, o_opt);
458  assert(cc == MPI_SUCCESS);
459  return MPI_SUCCESS;
460 }
461 
462 /** Sorts a key-value stream by the regular or the random
463  sampling-sort. It is NOT-STABLE due to quick-sort used inside.
464  It consumes an input key-value stream unless INSPECT is specified.
465  It can be used for "GraySort". Effective-options: NOTHREADING,
466  INSPECT. See struct kmr_option. */
467 
468 int
469 kmr_sort_large(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
470 {
471  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
472  assert(kmr_shuffle_compatible_p(kvo, kvi));
473  int cc;
474  KMR *mr = kvi->c.mr;
475  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1};
476  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
477  int nprocs = mr->nprocs;
478  struct kmr_option i_opt = kmr_copy_options_i_part(opt);
479  struct kmr_option o_opt = kmr_copy_options_o_part(opt);
480  struct kmr_option m_opt = kmr_copy_options_m_part(opt);
481  /* Let the number of samples less than factor^2 (100M by default). */
482  long factor = mr->sort_sample_factor;
483  long nsamples;
484  if (nprocs < factor) {
485  nsamples = (nprocs - 1);
486  } else {
487  nsamples = MAX(10, ((factor*factor) / nprocs));
488  }
489  if (mr->verbosity >= 7) {
490  if (mr->rank == 0) {
491  char ee[80];
492  snprintf(ee, sizeof(ee), "sort-oversampling=%e",
493  ((double)nsamples)/nprocs);
494  kmr_warning(mr, 7, ee);
495  }
496  }
497  /* Collect samples on each rank. */
498  enum kmr_kv_field keyf = kvi->c.key_data;
499  enum kmr_kv_field valf = kvi->c.value_data;
500  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
501  cc = kmr_sample_kv(nsamples, kvi, kvs1);
502  assert(kvs1->c.element_count == MIN(kvi->c.element_count, nsamples));
503  KMR_KVS *kvs2 = kmr_create_kvs(mr, keyf, valf);
504  cc = kmr_replicate(kvs1, kvs2, m_opt);
505  assert(cc == MPI_SUCCESS);
506  KMR_KVS *kvs3 = kmr_create_kvs(mr, keyf, valf);
507  cc = kmr_sort_locally(kvs2, kvs3, 0, m_opt);
508  assert(cc == MPI_SUCCESS);
509  /* Choose boundary values. */
510  int nbreaks = (nprocs - 1);
511  assert(kvs3->c.element_count >= nbreaks);
512  struct kmr_kv_box *bv = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)nbreaks);
513  cc = kmr_sample_to_array(nbreaks, kvs3, bv);
514  assert(cc == MPI_SUCCESS);
515  /* Partition. */
516  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
517  size_t prealloc = ((size_t)(16 * kvi->c.element_count) + kvi->c.storage_netsize);
518  cc = kmr_allocate_block(kvs4, prealloc);
519  assert(cc == MPI_SUCCESS);
520  cc = kmr_map(kvi, kvs4, bv, i_opt, kmr_rank_for_sort);
521  assert(cc == MPI_SUCCESS);
522  kmr_free(bv, (sizeof(struct kmr_kv_box) * (size_t)nbreaks));
523  cc = kmr_free_kvs(kvs3);
524  assert(cc == MPI_SUCCESS);
525  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
526  struct kmr_option ranking = {.key_as_rank = 1};
527  cc = kmr_shuffle(kvs4, kvs5, ranking);
528  assert(cc == MPI_SUCCESS);
529  KMR_KVS *kvs6 = kmr_create_kvs(mr, keyf, valf);
530  cc = kmr_unpairing(kvs5, kvs6, m_opt);
531  assert(cc == MPI_SUCCESS);
532  /* Locally sort. */
533  cc = kmr_sort_locally(kvs6, kvo, 0, o_opt);
534  assert(cc == MPI_SUCCESS);
535  return MPI_SUCCESS;
536 }
537 
538 /** Sort by rank0, a degenerated case for small number of keys. It is
539  NOT-STABLE due to quick-sort used inside. It consumes an input
540  key-value stream unless INSPECT is specified. Effective-options:
541  INSPECT. See struct kmr_option. */
542 
543 int
544 kmr_sort_by_one(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
545 {
546  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
547  assert(kmr_shuffle_compatible_p(kvo, kvi));
548  int cc;
549  KMR *mr = kvi->c.mr;
550  struct kmr_option kmr_supported = {.inspect = 1};
551  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
552  struct kmr_option i_opt = kmr_copy_options_i_part(opt);
553  i_opt.rank_zero = 1;
554  struct kmr_option o_opt = kmr_copy_options_o_part(opt);
555  enum kmr_kv_field keyf = kvi->c.key_data;
556  enum kmr_kv_field valf = kvi->c.value_data;
557  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
558  cc = kmr_replicate(kvi, kvs1, i_opt);
559  assert(cc == MPI_SUCCESS);
560  /* Locally sort. */
561  cc = kmr_sort_locally(kvs1, kvo, 0, o_opt);
562  assert(cc == MPI_SUCCESS);
563  return MPI_SUCCESS;
564 }
565 
566 /** Sorts a key-value stream globally. It is NOT-STABLE due to
567  quick-sort used inside. It consumes an input key-value stream
568  unless INSPECT is specified. It selects a sorting routine on the
569  total number of keys. See kmr_sort_large(), kmr_sort_small(), or
570  kmr_sort_by_one(). The results are stored as ascending ranks,
571  thus the rank0 holds the minimum. Effective-options: INSPECT.
572  See struct kmr_option. */
573 
574 int
575 kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
576 {
577  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
578  struct kmr_option kmr_supported = {.inspect = 1};
579  kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
580  const long lb = kvi->c.mr->sort_trivial;
581  const long threshold = kvi->c.mr->sort_threshold;
582  int cc;
583  long cnt;
584  cc = kmr_get_element_count(kvi, &cnt);
585  assert(cc == MPI_SUCCESS);
586  if (cnt <= lb) {
587  return kmr_sort_by_one(kvi, kvo, opt);
588  } else if (cnt <= (threshold * kvi->c.mr->nprocs)) {
589  return kmr_sort_small(kvi, kvo, opt);
590  } else {
591  return kmr_sort_large(kvi, kvo, opt);
592  }
593 }
594 
595 /* Tags the value part with a given integer. The new key-value pair
596  is (key,(tag,value)) for each origial (key,value). */
597 
598 static int
599 kmr_tag_value_fn(const struct kmr_kv_box kv,
600  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
601 {
602  int cc;
603  long *tagp = (long *)p;
604  long tag = *tagp;
605  struct kmr_kv_box v = {.klen = (int)sizeof(long), .vlen = kv.vlen,
606  .k.i = tag, .v = kv.v};
607  cc = kmr_add_pairing_under_key(kvo, kv.klen, kv.k, v,
608  KMR_KV_INTEGER, kvi->c.value_data);
609  assert(cc == MPI_SUCCESS);
610  return MPI_SUCCESS;
611 }
612 
613 /* Creates key-value pairs from the values, collecting keys as
614  0-tagged values, and collecting values as 1-tagged values. That
615  is, for example, given a set {(k,(0,v0)), (k,(0,v1)), (k,(0,v2)),
616  (k,(1,v3)), (k,(1,v4))}, it creates {(v0,v3), (v0,v4), (v1,v3),
617  (v1,v4), (v2,v3), (v2,v4)}. MEMO: The dummyf is the fake data-type
618  of the value field, which is unknown until checking the key is 0 or
619  1. */
620 
621 static int
622 kmr_make_product_fn(const struct kmr_kv_box kv[], const long n,
623  const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
624 {
625  int cc;
626  enum kmr_kv_field keyf = KMR_KV_INTEGER;
627  enum kmr_kv_field dummyf = KMR_KV_INTEGER;
628  enum kmr_kv_field valkf = kvo->c.key_data;
629  enum kmr_kv_field valvf = kvo->c.value_data;
630  long kcnt = 0;
631  long vcnt = 0;
632  for (long i = 0; i < n; i++) {
633  struct kmr_kvs_entry *e = (void *)kv[i].v.p;
634  struct kmr_kv_box vkv0 = kmr_pick_kv2(e, keyf, dummyf);
635  /*assert(vkv0.klen == sizeof(long));*/
636  long tag = vkv0.k.i;
637  assert(tag == 0 || tag == 1);
638  if (tag == 0) {
639  kcnt++;
640  } else {
641  vcnt++;
642  }
643  }
644  if (kcnt == 0 || vcnt == 0) {
645  return MPI_SUCCESS;
646  }
647  struct kmr_kv_box *
648  keys = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)kcnt);
649  struct kmr_kv_box *
650  vals = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)vcnt);
651  long ki = 0;
652  long vi = 0;
653  for (long i = 0; i < n; i++) {
654  struct kmr_kvs_entry *e = (void *)kv[i].v.p;
655  struct kmr_kv_box vkv0 = kmr_pick_kv2(e, keyf, dummyf);
656  /*assert(vkv0.klen == sizeof(long));*/
657  long tag = vkv0.k.i;
658  assert(tag == 0 || tag == 1);
659  if (tag == 0) {
660  struct kmr_kv_box vkv = kmr_pick_kv2(e, keyf, valkf);
661  keys[ki].klen = vkv.vlen;
662  keys[ki].k = vkv.v;
663  ki++;
664  } else {
665  struct kmr_kv_box vkv = kmr_pick_kv2(e, keyf, valvf);
666  vals[vi].vlen = vkv.vlen;
667  vals[vi].v = vkv.v;
668  vi++;
669  }
670  }
671  assert(ki == kcnt && vi == vcnt);
672  for (long i = 0; i < kcnt; i++) {
673  for (long j = 0; j < vcnt; j++) {
674  struct kmr_kv_box nkv = {.klen = keys[i].klen, .k = keys[i].k,
675  .vlen = vals[j].vlen, .v = vals[j].v};
676  cc = kmr_add_kv(kvo, nkv);
677  assert(cc == MPI_SUCCESS);
678  }
679  }
680  kmr_free(keys, (sizeof(struct kmr_kv_box) * (size_t)kcnt));
681  kmr_free(vals, (sizeof(struct kmr_kv_box) * (size_t)vcnt));
682  return MPI_SUCCESS;
683 }
684 
685 /** Makes key-value pairs as products of the two values in two
686  key-value stream. It creates a set of key-value pairs (ai,bj) of
687  the pairs (key,ai) from KVS0 and (key,bj) from KVS1 for the
688  matching key. It makes a direct-product of the values when
689  multiple values exist for a matching key. That is, for example,
690  given a set {(k,a0), (k,a1), (k,a2)} in KVS0 and {(k,b3), (k,b4)}
691  in KVS1 for some distinct key, it creates {(a0,b3), (a0,b4),
692  (a1,b3), (a1,b4), (a2,b3), (a2,b4)}. Effective-options:
693  NOTHREADNG. See struct kmr_option. */
694 
695 int
696 kmr_match(KMR_KVS *kvi0, KMR_KVS *kvi1, KMR_KVS *kvo, struct kmr_option opt)
697 {
698  kmr_assert_kvs_ok(kvi0, kvo, 1, 1);
699  kmr_assert_kvs_ok(kvi1, kvo, 1, 1);
700  struct kmr_option kmr_supported = {.nothreading = 1};
701  kmr_check_fn_options(kvi0->c.mr, kmr_supported, opt, __func__);
702  assert(kvi0->c.key_data == kvi1->c.key_data
703  && kvo->c.key_data == kvi0->c.value_data
704  && kvo->c.value_data == kvi1->c.value_data);
705  int cc;
706  KMR *mr = kvi0->c.mr;
707  struct kmr_option i_opt = kmr_copy_options_i_part(opt);
708  struct kmr_option o_opt = kmr_copy_options_o_part(opt);
709  /* Store two key-value streams into one. */
710  enum kmr_kv_field keyf = kvi0->c.key_data;
711  struct kmr_option keepopen = i_opt;
712  keepopen.keep_open = 1;
713  KMR_KVS *kvs2 = kmr_create_kvs(mr, keyf, KMR_KV_OPAQUE);
714  long tag0 = 0;
715  cc = kmr_map(kvi0, kvs2, &tag0, keepopen, kmr_tag_value_fn);
716  assert(cc == MPI_SUCCESS);
717  long tag1 = 1;
718  cc = kmr_map(kvi1, kvs2, &tag1, keepopen, kmr_tag_value_fn);
719  assert(cc == MPI_SUCCESS);
720  kmr_add_kv_done(kvs2);
721  /* Make products of two values. */
722  KMR_KVS *kvs3 = kmr_create_kvs(mr, keyf, KMR_KV_OPAQUE);
723  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
724  assert(cc == MPI_SUCCESS);
725  cc = kmr_reduce(kvs3, kvo, 0, o_opt, kmr_make_product_fn);
726  assert(cc == MPI_SUCCESS);
727  return MPI_SUCCESS;
728 }
729 
730 static int
731 kmr_get_integer_values_fn(const struct kmr_kv_box kv[], const long n,
732  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
733 {
734  assert(kvo == 0 && kvs->c.mr->nprocs == n);
735  long *vec = p;
736  for (int i = 0; i < n; i++) {
737  vec[i] = kv[i].v.i;
738  }
739  return MPI_SUCCESS;
740 }
741 
742 static int
743 kmr_ranking_fn(const struct kmr_kv_box kv,
744  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
745 {
746  long rankingbase = *(long *)p;
747  int cc;
748  int klen = sizeof(long);
749  union kmr_unit_sized k = {.i = (rankingbase + i)};
750  cc = kmr_add_pairing_under_key(kvo, klen, k, kv,
751  kvi->c.key_data, kvi->c.value_data);
752  assert(cc == MPI_SUCCESS);
753  return MPI_SUCCESS;
754 }
755 
756 /** Assigns a ranking to key-value pairs, and returns the number of
757  the total elements in COUNT. Ranking is a position in the
758  key-value stream. That is, for example, given a sequence
759  {(k0,v0), (k1,v1), (k2,v2)}, it creates {(0,(k0,v0)), (1,(k1,v1)),
760  (2,(k2,v2))}. Effective-options: NOTHREADING, INSPECT, KEEP_OPEN.
761  See struct kmr_option.*/
762 
763 int
764 kmr_ranking(KMR_KVS *kvi, KMR_KVS *kvo, long *count, struct kmr_option opt)
765 {
766  /* Do the first part of kmr_get_element_count(). */
767  kmr_assert_kvs_ok(kvi, 0, 1, 0);
768  KMR *mr = kvi->c.mr;
769  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
770  .keep_open = 1};
771  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
772  const int nprocs = mr->nprocs;
773  const int rank = mr->rank;
774  struct kmr_option m_opt = kmr_copy_options_m_part(opt);
775  int cc;
776  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
777  struct kmr_kv_box kv = {.klen = (int)sizeof(long),
778  .vlen = (int)sizeof(long),
779  .k.i = 0,
780  .v.i = kvi->c.element_count};
781  cc = kmr_add_kv(kvs0, kv);
782  assert(cc == MPI_SUCCESS);
783  kmr_add_kv_done(kvs0);
784  /* Replicate and reduce to get a total. */
785  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
786  cc = kmr_replicate(kvs0, kvs1, m_opt);
787  assert(cc == MPI_SUCCESS);
788  long *vec0 = kmr_malloc(sizeof(long) * (size_t)nprocs);
789  cc = kmr_reduce(kvs1, 0, vec0, m_opt, kmr_get_integer_values_fn);
790  assert(cc == MPI_SUCCESS);
791  long *vec1 = kmr_malloc(sizeof(long) * (size_t)nprocs);
792  long scan = 0;
793  for (int i = 0; i < nprocs; i++) {
794  vec1[i] = scan;
795  scan += vec0[i];
796  }
797  cc = kmr_map(kvi, kvo, &vec1[rank], opt, kmr_ranking_fn);
798  assert(cc == MPI_SUCCESS);
799  if (count != 0) {
800  *count = scan;
801  }
802  kmr_free(vec1, (sizeof(long) * (size_t)nprocs));
803  kmr_free(vec0, (sizeof(long) * (size_t)nprocs));
804  return MPI_SUCCESS;
805 }
806 
807 struct kmr_ranking_to_rank {_Bool cyclic; long factor;};
808 
809 static int
810 kmr_ranking_to_rank_fn(const struct kmr_kv_box kv,
811  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i)
812 {
813  struct kmr_ranking_to_rank *u = p;
814  int cc;
815  struct kmr_kv_box nkv = {.klen = kv.klen,
816  .vlen = kv.vlen,
817  .k.i = (u->cyclic
818  ? (kv.k.i % u->factor)
819  : (kv.k.i / u->factor)),
820  .v = kv.v};
821  cc = kmr_add_kv(kvo, nkv);
822  assert(cc == MPI_SUCCESS);
823  return MPI_SUCCESS;
824 }
825 
826 /** Distributes key-values so that each rank has approximately the
827  same number of pairs. It is used to level the load of mapping
828  among ranks by calling it before mapping. kmr_shuffle() can be
829  sufficient to distribute pairs in most cases, but sometimes it
830  results in uneven distribution because shuffling is based on
831  hashing on the keys. Effective-options: NOTHREADING, INSPECT,
832  KEEP_OPEN. See struct kmr_option. */
833 
834 int
835 kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
836 {
837  kmr_assert_kvs_ok(kvi, 0, 1, 0);
838  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
839  .keep_open = 1};
840  kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
841  struct kmr_option i_opt = kmr_copy_options_i_part(opt);
842  struct kmr_option o_opt = kmr_copy_options_o_part(opt);
843  struct kmr_option m_opt = kmr_copy_options_m_part(opt);
844  KMR *mr = kvi->c.mr;
845  const int nprocs = mr->nprocs;
846  int cc;
847  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
848  long count;
849  cc = kmr_ranking(kvi, kvs0, &count, i_opt);
850  assert(cc == MPI_SUCCESS);
851  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
852  struct kmr_ranking_to_rank u = {
853  .cyclic = cyclic,
854  .factor = (cyclic ? nprocs : ((count + nprocs - 1) / nprocs))
855  };
856  cc = kmr_map(kvs0, kvs1, &u, m_opt, kmr_ranking_to_rank_fn);
857  assert(cc == MPI_SUCCESS);
858  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
859  struct kmr_option ranking = {.key_as_rank = 1};
860  cc = kmr_shuffle(kvs1, kvs2, ranking);
861  assert(cc == MPI_SUCCESS);
862  cc = kmr_unpairing(kvs2, kvo, o_opt);
863  assert(cc == MPI_SUCCESS);
864  return MPI_SUCCESS;
865 }
866 
867 /* Prefix-scans on one key-value from each rank. It is used to
868  calculate the start value of a scan on each rank. IT CURRENTLY
869  RUNS SEQUENTIALLY BY COLLECTING ENTRIES ON RANK0. */
870 
871 static int
872 kmr_scan_across_ranks_sequentially(KMR_KVS *kvi, KMR_KVS *kvo,
873  KMR_KVS *total, kmr_redfn_t r)
874 {
875  int cc;
876  KMR *mr = kvo->c.mr;
877  enum kmr_kv_field keyf = kvi->c.key_data;
878  enum kmr_kv_field valf = kvi->c.value_data;
879 
880  /* Collect partial scans (on each rank) to rank zero. */
881 
882  KMR_KVS *kvs0 = kmr_create_kvs(mr, keyf, valf);
883  struct kmr_option rankzero = {.rank_zero = 1};
884  cc = kmr_replicate(kvi, kvs0, rankzero);
885  assert(cc == MPI_SUCCESS);
886 
887  /* Scan on rank zero. */
888 
889  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
890  KMR_KVS *kvo1 = kmr_create_kvs(mr, keyf, valf);
891  if (mr->rank == 0) {
892  KMR_KVS *zero = kmr_create_kvs(mr, keyf, valf);
893  cc = (*r)(0, 0, kvs0, zero, 0);
894  assert(cc == MPI_SUCCESS);
895  cc = kmr_add_kv_done(zero);
896  assert(cc == MPI_SUCCESS);
897 
898  cc = kmr_scan_locally(kvs0, zero, kvs1, kvo1, r);
899  assert(cc == MPI_SUCCESS);
900  } else {
901  kmr_free_kvs(kvs0);
902  cc = kmr_add_kv_done(kvs1);
903  assert(cc == MPI_SUCCESS);
904  cc = kmr_add_kv_done(kvo1);
905  assert(cc == MPI_SUCCESS);
906  }
907 
908  /* Scatter scan value from rank zero (by replacing keys with ranks). */
909 
910  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
911  long rankingbase = 0;
912  cc = kmr_map(kvs1, kvs2, &rankingbase, kmr_noopt, kmr_ranking_fn);
913  assert(cc == MPI_SUCCESS);
914 
915  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
916  struct kmr_option keyasrank = {.key_as_rank = 1};
917  cc = kmr_shuffle(kvs2, kvs3, keyasrank);
918  assert(cc == MPI_SUCCESS);
919 
920  cc = kmr_unpairing(kvs3, kvo, kmr_noopt);
921  assert(cc == MPI_SUCCESS);
922 
923  /* Copy carryover value (in total). */
924 
925  cc = kmr_replicate(kvo1, total, kmr_noopt);
926  assert(cc == MPI_SUCCESS);
927 
928  return MPI_SUCCESS;
929 }
930 
931 /** Prefix-scans every key-value with a reduce-function
932  (non-self-inclusively) and generates the final value in TOTAL (it
933  generates the same value on all the ranks in the TOTAL). The
934  key-values are scanned in the order in the KVS as they are
935  concatenated in the rank-order. The reduce-function should be
936  associative and free of side-effects (because it is called
937  multiple times on the same data). The reduce-function should
938  output a single key-value when given any number of key-value
939  pairs. Furthermore, it should output an identity element when it
940  is given zero key-value pairs. */
941 
942 int
944 {
945  int cc;
946  KMR *mr = kvo->c.mr;
947  enum kmr_kv_field keyf = kvi->c.key_data;
948  enum kmr_kv_field valf = kvi->c.value_data;
949 
950  /* Scan whole data on each rank. */
951 
952  KMR_KVS *kvs0 = kmr_create_kvs(mr, keyf, valf);
953  struct kmr_option inspect = {.inspect = 1};
954  cc = kmr_reduce_as_one(kvi, kvs0, 0, inspect, r);
955  assert(cc == MPI_SUCCESS);
956  assert(kvs0->c.element_count == 1);
957 
958  /* Scan among ranks. */
959 
960  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
961  cc = kmr_scan_across_ranks_sequentially(kvs0, kvs1, total, r);
962  assert(cc == MPI_SUCCESS);
963  assert(kvs1->c.element_count == 1);
964  assert(total->c.element_count == 1);
965 
966  /* Scan with global initial values on each rank. */
967 
968  cc = kmr_scan_locally(kvi, kvs1, kvo, 0, r);
969  assert(cc == MPI_SUCCESS);
970 
971  return MPI_SUCCESS;
972 }
973 
974 static int
975 kmr_count_key_fn(const struct kmr_kv_box kv[], const long n,
976  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
977 {
978  struct kmr_kv_box nkv = {
979  .klen = kv[0].klen,
980  .k = kv[0].k,
981  .vlen = sizeof(long),
982  .v.i = n};
983  kmr_add_kv(kvo, nkv);
984  return MPI_SUCCESS;
985 }
986 
987 static int
988 kmr_sum_key_counts_fn(const struct kmr_kv_box kv[], const long n,
989  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
990 {
991  long c = 0;
992  for (long i = 0; i < n; i++) {
993  c += kv[i].v.i;
994  }
995  struct kmr_kv_box nkv = {
996  .klen = kv[0].klen,
997  .k = kv[0].k,
998  .vlen = sizeof(long),
999  .v.i = c};
1000  kmr_add_kv(kvo, nkv);
1001  return MPI_SUCCESS;
1002 }
1003 
1004 /* Counts appearances of keys in KVI and returns key-count pairs in
1005  KVO, in the same way running a word-count. It implies inspect on
1006  the input KVI. */
1007 
1008 static int
1009 kmr_count_keys(KMR_KVS *kvi, KMR_KVS *kvo)
1010 {
1011  int cc;
1012  KMR *mr = kvi->c.mr;
1013  enum kmr_kv_field keyf = kvi->c.key_data;
1014  struct kmr_option inspect = {.inspect = 1};
1015  KMR_KVS *kvs0 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1016  cc = kmr_reduce(kvi, kvs0, 0, inspect, kmr_count_key_fn);
1017  assert(cc == MPI_SUCCESS);
1018  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1019  cc = kmr_shuffle(kvs0, kvs1, kmr_noopt);
1020  assert(cc == MPI_SUCCESS);
1021  cc = kmr_reduce(kvs1, kvo, 0, kmr_noopt, kmr_sum_key_counts_fn);
1022  assert(cc == MPI_SUCCESS);
1023  return MPI_SUCCESS;
1024 }
1025 
1026 /* Sums the value part for scan. The key field is unknown when n=0,
1027  assuming the keys are not used in scanning. */
1028 
1029 static int
1030 kmr_scan_sum_fn(const struct kmr_kv_box kv[], const long n,
1031  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
1032 {
1033  long sum = 0;
1034  for (long i = 0; i < n; i++) {
1035  sum += kv[i].v.i;
1036  }
1037  KMR *mr = kvs->c.mr;
1038  int keylen = kmr_legal_minimum_field_size(mr, kvs->c.key_data);
1039  struct kmr_kv_box nkv = {.klen = ((n != 0) ? (kv[0].klen) : keylen),
1040  .vlen = sizeof(long),
1041  .k.i = ((n != 0) ? (kv[0].k.i) : 0),
1042  .v.i = sum};
1043  kmr_add_kv(kvo, nkv);
1044  return MPI_SUCCESS;
1045 }
1046 
1047 /* Determines a target rank of a key from the partial sums of pair
1048  counts which are equally divided to the ranks. */
1049 
1050 static int
1051 kmr_partition_by_count_fn(const struct kmr_kv_box kv,
1052  const KMR_KVS *kvs, KMR_KVS *kvo, void *p,
1053  const long i)
1054 {
1055  KMR *mr = kvs->c.mr;
1056  const int nprocs = mr->nprocs;
1057  long nentries = *(long *)p;
1058  long partialsum = kv.v.i;
1059  assert(partialsum < nentries);
1060  long rank = (partialsum * nprocs) / nentries;
1061  struct kmr_kv_box nkv = {.klen = kv.klen,
1062  .vlen = kv.vlen,
1063  .k = kv.k,
1064  .v.i = rank};
1065  kmr_add_kv(kvo, nkv);
1066  return MPI_SUCCESS;
1067 }
1068 
1069 /** Shuffles key-values so that each rank has approximately the same
1070  number of pairs. It collects the same keys on a rank
1071  (cf. kmr_distribute()). */
1072 
1073 int
1075 {
1076  int cc;
1077  KMR *mr = kvi->c.mr;
1078  enum kmr_kv_field keyf = kvi->c.key_data;
1079  KMR_KVS *kvs0 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1080  cc = kmr_count_keys(kvi, kvs0);
1081  assert(cc == MPI_SUCCESS);
1082 
1083  /* kvs0 holds (key, count). */
1084 
1085  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1086  KMR_KVS *total = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1087  cc = kmr_scan_on_values(kvs0, kvs1, total, kmr_scan_sum_fn);
1088  assert(cc == MPI_SUCCESS);
1089 
1090  /* kvs1 holds (key, prefix-sum-of-counts). */
1091 
1092  struct kmr_kv_box kv;
1093  cc = kmr_take_one(total, &kv);
1094  long nentries = kv.v.i;
1095  kmr_free_kvs(total);
1096 
1097  KMR_KVS *kvs2 = kmr_create_kvs(mr, keyf, KMR_KV_INTEGER);
1098  cc = kmr_map(kvs1, kvs2, &nentries, kmr_noopt, kmr_partition_by_count_fn);
1099  assert(cc == MPI_SUCCESS);
1100 
1101  /* kvs2 holds (key, rank). */
1102 
1103  KMR_KVS *kvs3 = kmr_create_kvs(mr, keyf, KMR_KV_OPAQUE);
1104  cc = kmr_pairing(kvi, kvs3, kmr_noopt);
1105  assert(cc == MPI_SUCCESS);
1106 
1107  /* kvs3 holds (key, (key, value)). */
1108 
1109  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
1110  cc = kmr_match(kvs2, kvs3, kvs4, kmr_noopt);
1111  assert(cc == MPI_SUCCESS);
1112 
1113  /* kvs4 holds (rank, (key, value)). */
1114 
1115  struct kmr_option ranking = {.key_as_rank = 1};
1116  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
1117  cc = kmr_shuffle(kvs4, kvs5, ranking);
1118  assert(cc == MPI_SUCCESS);
1119  cc = kmr_unpairing(kvs5, kvo, kmr_noopt);
1120  assert(cc == MPI_SUCCESS);
1121  return MPI_SUCCESS;
1122 }
1123 
1124 /* See kmr_unpairing_fn(). */
1125 
1126 static int
1127 kmr_first_n_elements_fn(const struct kmr_kv_box kv,
1128  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
1129  const long i)
1130 {
1131  long n = *(long *)p;
1132  if (kv.k.i < n) {
1133  struct kmr_kvs_entry *e = (void *)kv.v.p;
1134  struct kmr_kv_box nkv = kmr_pick_kv(e, kvo);
1135  kmr_add_kv(kvo, nkv);
1136  }
1137  return MPI_SUCCESS;
1138 }
1139 
1140 /** Chooses the first N entries from a key-value stream KVI. The
1141  option nothreading is implied to keep the ordering.
1142  Effective-options: INSPECT, KEEP_OPEN. See struct kmr_option. */
1143 
1144 int
1146  struct kmr_option opt)
1147 {
1148  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
1149  KMR *mr = kvi->c.mr;
1150  struct kmr_option kmr_supported = {.inspect = 1, .keep_open = 1};
1151  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1152  struct kmr_option i_opt = kmr_copy_options_i_part(opt);
1153  int cc;
1154  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
1155  long count;
1156  cc = kmr_ranking(kvi, kvs0, &count, i_opt);
1157  assert(cc == MPI_SUCCESS);
1158  struct kmr_option opt1 = kmr_copy_options_o_part(opt);
1159  opt1.nothreading = 1;
1160  cc = kmr_map(kvs0, kvo, &n, opt1, kmr_first_n_elements_fn);
1161  assert(cc == MPI_SUCCESS);
1162  return MPI_SUCCESS;
1163 }
1164 
1165 /** Maps until some key-value are added. It stops processing, when
1166  the output is non-empty. It does not guarantee singleness.
1167  Existence/emptiness be checked by kmr_get_element_count(). */
1168 
1169 int
1171  void *arg, struct kmr_option opt, kmr_mapfn_t m)
1172 {
1173  int cc;
1174  cc = kmr_map9(1, kvi, kvo, arg, opt, m, __FILE__, __LINE__, __func__);
1175  return cc;
1176 }
1177 
1178 /** Reduces until some key-value are added. It stops processing, when
1179  the output is non-empty. It does not guarantee singleness.
1180  Existence/emptiness be checked by kmr_get_element_count(). */
1181 
1182 int
1183 kmr_reduce_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg,
1184  struct kmr_option opt, kmr_redfn_t r)
1185 {
1186  int cc;
1187  cc = kmr_reduce9(1, kvi, kvo, arg, opt, r, __FILE__, __LINE__, __func__);
1188  return cc;
1189 }
1190 
1191 /* ================================================================ */
1192 
1193 /** Returns an NTH entry of an n-tuple. It returns a pair of a length
1194  and a pointer. */
1195 
1196 struct kmr_ntuple_entry
1197 kmr_nth_ntuple(struct kmr_ntuple *u, int nth)
1198 {
1199  struct kmr_ntuple_entry e;
1200  assert(nth < u->n && u->index == u->n);
1201  char *p = (void *)u;
1202  size_t off = kmr_ntuple_nth_offset(u, nth);
1203  e.p = (p + off);
1204  e.len = u->len[nth];
1205  return e;
1206 }
1207 
1208 /** Returns the storage size of an n-tuple. */
1209 
1210 int
1212 {
1213  assert(u->index == u->n);
1214  return (int)kmr_ntuple_nth_offset(u, u->n);
1215 }
1216 
1217 /** Returns the storage size of an n-tuple for N entries with LEN[i]
1218  size each. */
1219 
1220 int
1222 {
1223  size_t off;
1224  off = kmr_ntuple_data_offset(n);
1225  for (int i = 0; i < n; i++) {
1226  off += (size_t)kmr_ntuple_entry_size(len[i]);
1227  }
1228  return (int)off;
1229 }
1230 
1231 /** Resets an n-tuple U with N entries and a MARKER. */
1232 
1233 void
1234 kmr_reset_ntuple(struct kmr_ntuple *u, int n, int marker)
1235 {
1236  assert(n <= USHRT_MAX);
1237  u->marker = marker;
1238  u->n = (unsigned short)n;
1239  u->index = 0;
1240  int w = (KMR_ALIGN((int)sizeof(u->len[0]) * n) / (int)sizeof(u->len[0]));
1241  for (int i = 0; i < w; i++) {
1242  u->len[i] = 0;
1243  }
1244 }
1245 
1246 /** Adds an entry V with LEN in an n-tuple U whose size is limited to
1247  SIZE. An n-tuple should be initialized by kmr_reset_ntuple()
1248  first. Note it fills with zeros the gap of the alignment padding,
1249  allowing the n-tuples be used as opaque keys. */
1250 
1251 int
1252 kmr_put_ntuple(KMR *mr, struct kmr_ntuple *u, const int size,
1253  const void *v, const int len)
1254 {
1255  assert(u->index < u->n);
1256  char *p = kmr_ntuple_insertion_point(u);
1257  int sz = kmr_ntuple_entry_size(len);
1258  char *end = ((char *)p + sz);
1259  if (!(end <= ((char *)(void *)u + size))) {
1260  kmr_error(mr, "kmr_put_ntuple exceeds the buffer size");
1261  return -1;
1262  }
1263  u->len[u->index] = (unsigned short)len;
1264  memcpy(p, v, (size_t)len);
1265  memset((p + len), 0, (size_t)(end - (p + len)));
1266  u->index++;
1267  return MPI_SUCCESS;
1268 }
1269 
1270 /** Adds an integer value in an n-tuple U whose size is limited to
1271  SIZE. See kmr_put_ntuple(). */
1272 
1273 int
1274 kmr_put_ntuple_long(KMR *mr, struct kmr_ntuple *u, const int sz, long v)
1275 {
1276  int cc = kmr_put_ntuple(mr, u, sz, &v, sizeof(long));
1277  return cc;
1278 }
1279 
1280 /** Adds an n-tuple entry E in an n-tuple U whose size is limited to
1281  SIZE. See kmr_put_ntuple(). */
1282 
1283 int
1284 kmr_put_ntuple_entry(KMR *mr, struct kmr_ntuple *u, const int sz,
1285  struct kmr_ntuple_entry e)
1286 {
1287  int cc = kmr_put_ntuple(mr, u, sz, e.p, e.len);
1288  return cc;
1289 }
1290 
1291 /** Adds an n-tuple U with a given key K and KLEN in a key-value
1292  stream KVO. */
1293 
1294 int
1295 kmr_add_ntuple(KMR_KVS *kvo, void *k, int klen, struct kmr_ntuple *u)
1296 {
1297  assert(u->index == u->n);
1298  assert(kvo->c.value_data == KMR_KV_OPAQUE
1299  || kvo->c.value_data == KMR_KV_CSTRING);
1300  struct kmr_kv_box kv = {
1301  .klen = klen,
1302  .vlen = kmr_size_ntuple(u),
1303  .k.p = k,
1304  .v.p = (void *)u
1305  };
1306  kmr_add_kv(kvo, kv);
1307  return MPI_SUCCESS;
1308 }
1309 
1310 /** Separates the n-tuples stored in the value part of KV into the two
1311  sets by their marker values. It is intended to be used in reduce
1312  functions. It separates the n-tuples to the first set by
1313  marker=MARKERS[0] and to the second set by marker=MARKERS[1]. It
1314  returns two malloced arrays in VV with their sizes in CNT. The
1315  arrays VV[0] and VV[1] should be freed by the caller. */
1316 
1317 int
1319  const struct kmr_kv_box kv[], const long n,
1320  struct kmr_ntuple **vv[2], long cnt[2],
1321  int markers[2], _Bool disallow_other_entries)
1322 {
1323  long c0;
1324  long c1;
1325 
1326  c0 = 0;
1327  c1 = 0;
1328  for (long i = 0; i < n; i++) {
1329  struct kmr_ntuple *u = (void *)kv[i].v.p;
1330  if (u->marker == markers[0]) {
1331  c0++;
1332  } else if (u->marker == markers[1]) {
1333  c1++;
1334  } else {
1335  /* Ignore. */
1336  if (disallow_other_entries) {
1337  assert(u->marker == markers[0] || u->marker == markers[1]);
1338  }
1339  }
1340  }
1341 
1342  long cnt0 = c0;
1343  long cnt1 = c1;
1344  struct kmr_ntuple **
1345  v0 = kmr_malloc(sizeof(struct kmr_ntuple *) * (size_t)cnt0);
1346  struct kmr_ntuple **
1347  v1 = kmr_malloc(sizeof(struct kmr_ntuple *) * (size_t)cnt1);
1348 
1349  c0 = 0;
1350  c1 = 0;
1351  for (long i = 0; i < n; i++) {
1352  struct kmr_ntuple *u = (void *)kv[i].v.p;
1353  if (u->marker == markers[0]) {
1354  v0[c0] = u;
1355  c0++;
1356  } else if (u->marker == markers[1]) {
1357  v1[c1] = u;
1358  c1++;
1359  } else {
1360  /* Ignore. */
1361  }
1362  }
1363  assert(cnt0 == c0 && cnt1 == c1);
1364 
1365  vv[0] = v0;
1366  vv[1] = v1;
1367  cnt[0] = cnt0;
1368  cnt[1] = cnt1;
1369  return MPI_SUCCESS;
1370 }
1371 
1372 static inline int
1373 kmr_product_ntuples_by_space(KMR_KVS *kvo,
1374  struct kmr_ntuple **vv[2], long cnt[2],
1375  int marker,
1376  int slots[][2], int nslots,
1377  int keys[][2], int nkeys,
1378  _Bool byspace)
1379 {
1380  KMR *mr = kvo->c.mr;
1381  const int AUNIT = 1024;
1382 
1383  void *keyp;
1384  void *valuep;
1385  size_t keysz;
1386  size_t valuesz;
1387 
1388  keyp = 0;
1389  keysz = 0;
1390  valuep = 0;
1391  valuesz = 0;
1392 
1393  struct kmr_ntuple *ee[2];
1394  for (long k0 = 0; k0 < cnt[0]; k0++) {
1395  ee[0] = vv[0][k0];
1396  for (long k1 = 0; k1 < cnt[1]; k1++) {
1397  ee[1] = vv[1][k1];
1398 
1399  int klen;
1400  if (nkeys == 1) {
1401  int *choice = keys[0];
1402  struct kmr_ntuple_entry
1403  e = kmr_nth_ntuple(ee[choice[1]], choice[0]);
1404  klen = e.len;
1405  } else {
1406  int sz;
1407  sz = (int)kmr_ntuple_data_offset(nkeys);
1408  for (int i = 0; i < nkeys; i++) {
1409  int *choice = keys[i];
1410  struct kmr_ntuple_entry
1411  e = kmr_nth_ntuple(ee[choice[1]], choice[0]);
1412  sz += kmr_ntuple_entry_size(e.len);
1413  }
1414  klen = sz;
1415  }
1416 
1417  int vlen;
1418  {
1419  int sz;
1420  sz = (int)kmr_ntuple_data_offset(nslots);
1421  for (int i = 0; i < nslots; i++) {
1422  int *choice = slots[i];
1423  struct kmr_ntuple_entry
1424  e = kmr_nth_ntuple(ee[choice[1]], choice[0]);
1425  sz += kmr_ntuple_entry_size(e.len);
1426  }
1427  vlen = sz;
1428  }
1429 
1430  if (byspace) {
1431  struct kmr_kv_box nkv = {
1432  .klen = klen,
1433  .k.p = 0,
1434  .vlen = vlen,
1435  .v.p = 0
1436  };
1437  keyp = 0;
1438  valuep = 0;
1439  kmr_add_kv_space(kvo, nkv, &keyp, &valuep);
1440  } else {
1441  if ((size_t)klen > keysz) {
1442  kmr_free(keyp, keysz);
1443  keysz = (size_t)((klen + AUNIT - 1) & ~(AUNIT - 1));
1444  keyp = kmr_malloc(keysz);
1445  }
1446  if ((size_t)vlen > valuesz) {
1447  kmr_free(valuep, valuesz);
1448  valuesz = (size_t)((vlen + AUNIT - 1) & ~(AUNIT - 1));
1449  valuep = kmr_malloc(valuesz);
1450  }
1451  assert(keysz > (size_t)klen);
1452  assert(valuesz > (size_t)vlen);
1453  }
1454 
1455  /* Fill key. */
1456 
1457  if (nkeys == 1) {
1458  int *keychoice = keys[0];
1459  struct kmr_ntuple_entry
1460  e = kmr_nth_ntuple(ee[keychoice[1]], keychoice[0]);
1461  memcpy(keyp, e.p, (size_t)e.len);
1462  assert(klen == e.len);
1463  } else {
1464  struct kmr_ntuple *k = (void *)keyp;
1465  kmr_reset_ntuple(k, nkeys, 0);
1466  for (int i = 0; i < nkeys; i++) {
1467  int *choice = keys[i];
1468  struct kmr_ntuple_entry
1469  e = kmr_nth_ntuple(ee[choice[1]], choice[0]);
1470  assert(i == k->index);
1471  kmr_put_ntuple_entry(mr, k, (int)klen, e);
1472  }
1473  assert(klen == kmr_size_ntuple(k));
1474  }
1475 
1476  /* Fill value. */
1477 
1478  {
1479  struct kmr_ntuple *v = (void *)valuep;
1480  kmr_reset_ntuple(v, nslots, marker);
1481  for (int i = 0; i < nslots; i++) {
1482  int *choice = slots[i];
1483  struct kmr_ntuple_entry
1484  e = kmr_nth_ntuple(ee[choice[1]], choice[0]);
1485  assert(i == v->index);
1486  kmr_put_ntuple_entry(mr, v, (int)vlen, e);
1487  }
1488  assert(vlen == kmr_size_ntuple(v));
1489  }
1490 
1491  if (!byspace) {
1492  struct kmr_kv_box nkv = {
1493  .klen = klen,
1494  .k.p = keyp,
1495  .vlen = vlen,
1496  .v.p = valuep
1497  };
1498  kmr_add_kv(kvo, nkv);
1499  }
1500  }
1501  }
1502 
1503  if (!byspace) {
1504  if (keyp != 0) {
1505  kmr_free(keyp, keysz);
1506  }
1507  if (valuep != 0) {
1508  kmr_free(valuep, valuesz);
1509  }
1510  }
1511 
1512  return MPI_SUCCESS;
1513 }
1514 
1515 /** Makes a direct product of the two sets of n-tuples VV[0] and VV[1]
1516  with their counts in CNT[0] and CNT[1]. It is intended to be used
1517  in reduce functions. The resulting n-tuples are created by SLOTS,
1518  which chooses i-th entry of the n-tuples by the SLOTS[i][0]-th
1519  entry from the the SLOTS[i][1] set, 0 from the first set and 1
1520  from the second set. The product n-tuples have MARKER and are
1521  inserted into KVO under the new key. The new key is selected like
1522  values using KEYS[j][0] and KEYS[j][1]. The key is not an n-tuple
1523  when NKEYS=1, or an n-tuple of KEYS[j] entries. The n-tuple key
1524  has zero as a marker. Note that it does not remove duplicate
1525  entries. */
1526 
1527 int
1529  struct kmr_ntuple **vv[2], long cnt[2],
1530  int marker,
1531  int slots[][2], int nslots,
1532  int keys[][2], int nkeys)
1533 {
1534  int cc;
1535  if (kvo->c.magic == KMR_KVS_ONCORE) {
1536  cc = kmr_product_ntuples_by_space(kvo, vv, cnt, marker,
1537  slots, nslots, keys, nkeys,
1538  1);
1539  return cc;
1540  } else if (kvo->c.magic == KMR_KVS_PUSHOFF) {
1541  cc = kmr_product_ntuples_by_space(kvo, vv, cnt, marker,
1542  slots, nslots, keys, nkeys,
1543  0);
1544  return cc;
1545  } else {
1546  assert((kvo->c.magic == KMR_KVS_ONCORE)
1547  || (kvo->c.magic == KMR_KVS_PUSHOFF));
1548  assert(NEVERHERE);
1549  return 0;
1550  }
1551 }
1552 
1553 static int
1554 kmr_put_integer_to_array_fn(const struct kmr_kv_box kv,
1555  const KMR_KVS *kvi, KMR_KVS *kvo,
1556  void *p, const long i)
1557 {
1558  long *v = p;
1559  v[i] = kv.v.i;
1560  return MPI_SUCCESS;
1561 }
1562 
1563 /** Fills an integer array FRQ[i] with the count of the elements of
1564  each rank. The array FRQ be as large as nprocs. It also fills
1565  VAR[0]=average, VAR[1]=variance, VAR[2]=min, and VAR[3]=max. FRQ
1566  or VAR can be null. */
1567 
1568 int
1569 kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var,
1570  _Bool rankzeroonly)
1571 {
1572  kmr_assert_kvs_ok(kvs, 0, 1, 0);
1573  assert(kvs->c.magic == KMR_KVS_ONCORE);
1574  KMR *mr = kvs->c.mr;
1575  int nprocs = mr->nprocs;
1576  int cc;
1577 
1578  long *vec;
1579  if (frq != 0) {
1580  vec = frq;
1581  } else {
1582  vec = kmr_malloc(sizeof(long) * (size_t)nprocs);
1583  }
1584 
1585  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1586  struct kmr_kv_box nkv = {
1587  .klen = sizeof(long),
1588  .k.i = mr->rank,
1589  .vlen = sizeof(long),
1590  .v.i = kvs->c.element_count
1591  };
1592  kmr_add_kv(kvs0, nkv);
1593  kmr_add_kv_done(kvs0);
1594  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
1595  struct kmr_option opt = {.rank_zero = rankzeroonly};
1596  cc = kmr_replicate(kvs0, kvs1, opt);
1597  assert(cc == MPI_SUCCESS);
1598  /*KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);*/
1599  /*cc = kmr_sort_locally(kvs1, kvs2, 0, kmr_noopt);*/
1600  /*assert(cc == MPI_SUCCESS);*/
1601  cc = kmr_map(kvs1, 0, vec, kmr_noopt, kmr_put_integer_to_array_fn);
1602  assert(cc == MPI_SUCCESS);
1603 
1604  if (var != 0 && (!rankzeroonly || mr->rank == 0)) {
1605  double min = (double)LONG_MAX;
1606  double max = 0.0;
1607  double s1 = 0.0;
1608  double s2 = 0.0;
1609  for (int r = 0; r < nprocs; r++) {
1610  double x = (double)vec[r];
1611  s1 += x;
1612  s2 += (x * x);
1613  min = MIN(min, x);
1614  max = MAX(max, x);
1615  }
1616  double a1 = (s1 / (double)nprocs);
1617  double a2 = (s2 / (double)nprocs);
1618  var[0] = a1;
1619  var[1] = a2 - (a1 * a1);
1620  var[2] = min;
1621  var[3] = max;
1622  }
1623 
1624  if (frq == 0) {
1625  kmr_free(vec, sizeof(long) * (size_t)nprocs);
1626  };
1627 
1628  return MPI_SUCCESS;
1629 }
1630 
1631 /*
1632 Copyright (C) 2012-2016 RIKEN AICS
1633 This library is distributed WITHOUT ANY WARRANTY. This library can be
1634 redistributed and/or modified under the terms of the BSD 2-Clause License.
1635 */
int kmr_map_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps until some key-value are added.
Definition: kmrmoreops.c:1170
int kmr_product_ntuples(KMR_KVS *kvo, struct kmr_ntuple **vv[2], long cnt[2], int marker, int slots[][2], int nslots, int keys[][2], int nkeys)
Makes a direct product of the two sets of n-tuples VV[0] and VV[1] with their counts in CNT[0] and CN...
Definition: kmrmoreops.c:1528
Key-Value Stream (abstract).
Definition: kmr.h:587
Utilities Private Part (do not include from applications).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
#define KMR_ALIGN(X)
Rounds up a given size to the alignment restriction (currently eight bytes).
Definition: kmrimpl.h:75
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
int kmr_map9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *, const int, const char *)
Maps simply.
Definition: kmrbase.c:1289
int kmr_put_ntuple_entry(KMR *mr, struct kmr_ntuple *u, const int sz, struct kmr_ntuple_entry e)
Adds an n-tuple entry E in an n-tuple U whose size is limited to SIZE.
Definition: kmrmoreops.c:1284
int kmr_scan_locally(KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
Scans every key-value with a reduce-function locally (independently on each rank).
Definition: kmrbase.c:2880
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_reverse(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Makes a new pair by swapping the key and the value in each pair.
Definition: kmrmoreops.c:159
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
int kmr_copy_to_array_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Copies the entry in the array.
Definition: kmrutil.c:934
int kmr_pairing(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replaces a value part with a key-value pairing.
Definition: kmrmoreops.c:212
int kmr_reduce9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *, const int, const char *)
Reduces key-value pairs.
Definition: kmrbase.c:2549
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
Definition: kmrmoreops.c:575
int kmr_ranking(KMR_KVS *kvi, KMR_KVS *kvo, long *count, struct kmr_option opt)
Assigns a ranking to key-value pairs, and returns the number of the total elements in COUNT...
Definition: kmrmoreops.c:764
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
Definition: kmr.h:348
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
Definition: kmrmoreops.c:1569
KMR Context.
Definition: kmr.h:222
int kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep)
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value p...
Definition: kmrbase.c:843
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
int kmr_separate_ntuples(KMR *mr, const struct kmr_kv_box kv[], const long n, struct kmr_ntuple **vv[2], long cnt[2], int markers[2], _Bool disallow_other_entries)
Separates the n-tuples stored in the value part of KV into the two sets by their marker values...
Definition: kmrmoreops.c:1318
int kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
Distributes key-values so that each rank has approximately the same number of pairs.
Definition: kmrmoreops.c:835
int kmr_sort_by_one(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sort by rank0, a degenerated case for small number of keys.
Definition: kmrmoreops.c:544
struct kmr_kvs_entry * kmr_find_kvs_last_entry(KMR_KVS *kvs)
Finds the last entry of a key-value stream.
Definition: kmrbase.c:2759
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:325
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
Definition: kmrbase.c:1369
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *ko)
Finds a key-value pair for a key.
Definition: kmrmoreops.c:43
int kmr_match(KMR_KVS *kvi0, KMR_KVS *kvi1, KMR_KVS *kvo, struct kmr_option opt)
Makes key-value pairs as products of the two values in two key-value stream.
Definition: kmrmoreops.c:696
struct kmr_ntuple_entry kmr_nth_ntuple(struct kmr_ntuple *u, int nth)
Returns an NTH entry of an n-tuple.
Definition: kmrmoreops.c:1197
int kmr_sort_small(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream, by partitioning to equal ranges.
Definition: kmrmoreops.c:388
int kmr_scan_on_values(KMR_KVS *kvi, KMR_KVS *kvo, KMR_KVS *total, kmr_redfn_t r)
Prefix-scans every key-value with a reduce-function (non-self-inclusively) and generates the final va...
Definition: kmrmoreops.c:943
void * kmr_bsearch(const void *key, const void *base, size_t nel, size_t size, int(*compar)(const void *, const void *))
Searches a key entry like bsearch(3C), but returns a next greater entry instead of null on no match...
Definition: kmrutil.c:565
int kmr_sort_large(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream by the regular or the random sampling-sort.
Definition: kmrmoreops.c:469
int kmr_add_ntuple(KMR_KVS *kvo, void *k, int klen, struct kmr_ntuple *u)
Adds an n-tuple U with a given key K and KLEN in a key-value stream KVO.
Definition: kmrmoreops.c:1295
int kmr_map_skipping(long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps by skipping the number of entries.
Definition: kmrbase.c:1134
int kmr_unpairing(KMR_KVS *kvs, KMR_KVS *kvo, struct kmr_option opt)
Extracts a key-value pair from a pairing in the value part, discarding the original key...
Definition: kmrmoreops.c:234
int kmr_reduce_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Reduces until some key-value are added.
Definition: kmrmoreops.c:1183
int kmr_choose_first_part(KMR_KVS *kvi, KMR_KVS *kvo, long n, struct kmr_option opt)
Chooses the first N entries from a key-value stream KVI.
Definition: kmrmoreops.c:1145
KMR Interface.
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Definition: kmrbase.c:937
void kmr_reset_ntuple(struct kmr_ntuple *u, int n, int marker)
Resets an n-tuple U with N entries and a MARKER.
Definition: kmrmoreops.c:1234
int kmr_legal_minimum_field_size(KMR *mr, enum kmr_kv_field f)
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. ...
Definition: kmrbase.c:2847
Unit-Sized Storage.
Definition: kmr.h:340
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
Definition: kmrmoreops.c:114
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
int kmr_size_ntuple_by_lengths(int n, int len[])
Returns the storage size of an n-tuple for N entries with LEN[i] size each.
Definition: kmrmoreops.c:1221
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
Definition: kmrimpl.h:551
N-Tuple.
Definition: kmr.h:731
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
Definition: kmrbase.c:1993
int kmr_put_ntuple_long(KMR *mr, struct kmr_ntuple *u, const int sz, long v)
Adds an integer value in an n-tuple U whose size is limited to SIZE.
Definition: kmrmoreops.c:1274
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
Definition: kmrbase.c:2625
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
Definition: kmr.h:700
int kmr_shuffle_leveling_pair_count(KMR_KVS *kvi, KMR_KVS *kvo)
Shuffles key-values so that each rank has approximately the same number of pairs. ...
Definition: kmrmoreops.c:1074
int kmr_put_ntuple(KMR *mr, struct kmr_ntuple *u, const int size, const void *v, const int len)
Adds an entry V with LEN in an n-tuple U whose size is limited to SIZE.
Definition: kmrmoreops.c:1252
N-Tuple Argument.
Definition: kmr.h:740
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:689
int kmr_size_ntuple(struct kmr_ntuple *u)
Returns the storage size of an n-tuple.
Definition: kmrmoreops.c:1211
int kmr_find_string(KMR_KVS *kvi, const char *k, const char **vq)
Finds the key K in the key-value stream KVS.
Definition: kmrmoreops.c:73