KMR
kmrbase.c
Go to the documentation of this file.
1 /* kmrbase.c (2014-02-04) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrbase.c KMR Base Implementation (on-memory operations).
5  KMR aims at fast shuffling and scalability, and provides modest
6  utilities for programming with map-reduce. This part implements
7  on-memory operations. */
8 
9 /* NOTE: (1) KMR and KMR_KVS are handled collectively (allocated,
10  modified, and freed). */
11 
12 #include <mpi.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <unistd.h>
16 #include <limits.h>
17 #include <errno.h>
18 #include <assert.h>
19 #include <ctype.h>
20 #ifdef _OPENMP
21 #include <omp.h>
22 #endif
23 #include "../config.h"
24 #include "kmr.h"
25 #include "kmrimpl.h"
26 #include "kmrtrace.h"
27 
28 int KMR_API_ID = 0;
29 const int kmr_version = KMR_H;
30 
31 #define MIN(a,b) (((a)<(b))?(a):(b))
32 #define MAX(a,b) (((a)>(b))?(a):(b))
33 #define NEVERHERE 0
34 
35 /* Default unit of allocation of memory block for key-value pairs.
36  See preset_block_size in struct kmr_ctx. */
37 
38 #define BLOCK_SIZE (64 * 1024 * 1024)
39 
40 /* Default of the number of entries pooled before calling a
41  map-function. See mapper_park_size in struct kmr_ctx. */
42 
43 #define MAP_PARK_SIZE (1024)
44 
45 /* Default block size of a push-off key-value stream. See See
46  pushoff_block_size in struct kmr_ctx. */
47 
48 #define PUSHOFF_SIZE (64 * 1024)
49 
50 /* Checks if an end-of-block marker is properly placed. It only
51  checks when a key-value stream consists of a single preallocated
52  block. */
53 
54 static inline void
55 kmr_assert_on_tail_marker(KMR_KVS *kvs)
56 {
57  if (kvs != 0 && kvs->c.block_count == 1) {
58  struct kmr_kvs_block *b = kvs->c.first_block;
59  size_t netsz = kvs->c.storage_netsize;
60  struct kmr_kvs_entry *e = kmr_kvs_entry_at(kvs, b, netsz);
61  assert((((intptr_t)e) & 3) == 0
62  && e->klen == -1 && e->vlen == -1);
63  }
64 }
65 
66 /* Sets up the environment (nothing currently). It checks the data
67  sizes in C meets the KMR assumptions. It also checks MPI_LONG has
68  8-byte size, that is assumed in all-to-all communication. It calls
69  an OMP function, to initialize the threads environment here. */
70 
71 int
72 kmr_init_2(int ignore)
73 {
74  int cc;
75  assert(sizeof(long) == sizeof(size_t)
76  && sizeof(long) == sizeof(ssize_t)
77  && sizeof(long) == sizeof(off_t)
78  && sizeof(long) == sizeof(uint64_t)
79  && sizeof(long) >= sizeof(intptr_t)
80  && sizeof(long) >= sizeof(void *));
81  assert(kmr_check_alignment(offsetof(struct kmr_kvs_entry, c)));
82  assert(kmr_check_alignment(offsetof(struct kmr_kvs_block, data)));
83  assert(kmr_check_alignment(offsetof(struct kmr_ntuple_entry, len)));
84  assert(sizeof(struct kmr_option) == sizeof(long)
85  && sizeof(struct kmr_file_option) == sizeof(long)
86  && sizeof(struct kmr_spawn_option) == sizeof(long));
87  MPI_Aint lb;
88  MPI_Aint extent;
89  cc = MPI_Type_get_extent(MPI_LONG, &lb, &extent);
90  assert(cc == MPI_SUCCESS);
91  assert(lb == 0 && extent == 8);
92 
93 #if 0
94  KMR_OMP_PARALLEL_
95  {
96  int tid = omp_get_thread_num();
97  assert(tid >= 0);
98  }
99 #endif
100 
101  return MPI_SUCCESS;
102 }
103 
104 /** Sets up the environment, and checks the constant definitions in C
105  and Fortran are consistent. */
106 
107 int
108 kmr_init_ff(int kf, struct kmr_option opt, struct kmr_file_option fopt)
109 {
110  union {struct kmr_option o; unsigned long i;} opt0 = {.o = opt};
111  union {struct kmr_file_option o; unsigned long i;} fopt0 = {.o = fopt};
112  opt0.o.rank_zero = 0;
113  fopt0.o.shuffle_names = 0;
114  assert(kf == KMR_KV_POINTER_UNMANAGED
115  && opt.rank_zero && fopt.shuffle_names
116  && opt0.i == 0 && fopt0.i == 0);
117  kmr_init_2(0);
118  return MPI_SUCCESS;
119 }
120 
121 /** Clears the environment. */
122 
123 int
124 kmr_fin(void)
125 {
126  return MPI_SUCCESS;
127 }
128 
129 /** Makes a new KMR context (a context has type KMR). A KMR context
130  is a record of common information to all key-value streams. COMM
131  is a communicator for use inside. It dups the given communicator
132  inside, to avoid conflicts with other calls to MPI functions. MPI
133  should be initialized with a thread support level of either
134  MPI_THREAD_SERIALIZED or MPI_THREAD_MULTIPLE. CONF specifies
135  configuration options. It should be freed after a call. The
136  options can differ on each rank, (in this version). The
137  configuration options are first taken from a file with a name
138  specified by the environment variable "KMROPTION" on rank0, and
139  they are merged with the explicitly given ones. The KMROPTION
140  file has the file format of Java properties (but only in Latin
141  characters). Refer to JDK documents on "java.util.Properties" (on
142  "load" method) for the file format. The explicitly given ones
143  have precedence. IDENTIFYING_NAME is just recorded in the
144  context, and has no specific use. It may be null. */
145 
146 KMR *
147 kmr_create_context(const MPI_Comm comm, const MPI_Info conf,
148  const char *identifying_name)
149 {
150  int cc;
151  KMR *mr = kmr_malloc(sizeof(struct kmr_ctx));
152  KMR_DEBUGX(memset(mr, 0, sizeof(struct kmr_ctx)));
153 
154  cc = MPI_Comm_size(comm, &mr->nprocs);
155  assert(cc == MPI_SUCCESS);
156  cc = MPI_Comm_rank(comm, &mr->rank);
157  assert(cc == MPI_SUCCESS);
158  cc = MPI_Comm_dup(comm, &mr->comm);
159  if (cc != MPI_SUCCESS) {
160  kmr_error_mpi(mr, "MPI_Comm_dup", cc);
161  MPI_Abort(MPI_COMM_WORLD, 1);
162  }
163 
164 #ifdef _OPENMP
165  int omp_thrd = omp_get_thread_limit();
166 #else
167  int omp_thrd = 1;
168 #endif
169  assert(omp_thrd >= 1);
170 
171  int mpi_thrd;
172  cc = MPI_Query_thread(&mpi_thrd);
173  assert(cc == MPI_SUCCESS);
174  assert(mpi_thrd == MPI_THREAD_SINGLE
175  || mpi_thrd == MPI_THREAD_FUNNELED
176  || mpi_thrd == MPI_THREAD_SERIALIZED
177  || mpi_thrd == MPI_THREAD_MULTIPLE);
178  if (mpi_thrd == MPI_THREAD_SINGLE
179  || mpi_thrd == MPI_THREAD_FUNNELED) {
180  if (omp_thrd > 1) {
181  char ee[80];
182  char *s = ((mpi_thrd == MPI_THREAD_SINGLE)
183  ? "MPI_THREAD_SINGLE"
184  : "MPI_THREAD_FUNNELED");
185  snprintf(ee, sizeof(ee), "Thread support of MPI is low: %s", s);
186  kmr_warning(mr, 1, ee);
187  }
188  }
189 
190  mr->kvses.head = 0;
191  mr->kvses.tail = 0;
192 
193  mr->ckpt_kvs_id_counter = 0;
194  mr->ckpt_ctx = 0;
195  mr->ckpt_enable = 0;
196  mr->ckpt_selective = 0;
197  mr->ckpt_no_fsync = 0;
198 
199  mr->kvt_ctx = 0;
200 
201  mr->log_traces = 0;
202  mr->atwork = 0;
203 
204  mr->spawn_size = 0;
205  mr->spawn_comms = 0;
206 
207  mr->mapper_park_size = MAP_PARK_SIZE;
208  mr->preset_block_size = BLOCK_SIZE;
209  mr->malloc_overhead = (int)sizeof(void *);
210 
211  mr->atoa_threshold = 512;
212 
213  mr->sort_trivial = 100000;
214  mr->sort_threshold = 100L;
215  mr->sort_sample_factor = 10000;
216  mr->sort_threads_depth = 5;
217 
218  mr->file_io_block_size = (1024 * 1024);
219 
220  mr->pushoff_block_size = PUSHOFF_SIZE;
221  mr->pushoff_poll_rate = 0;
222 
223 #if defined(KMRLIBDIR)
224  mr->kmr_installation_path = KMRLIBDIR;
225 #else
226  mr->kmr_installation_path = 0;
227 #endif
228  mr->spawn_watch_program = 0;
229  mr->spawn_watch_prefix = 0;
230  mr->spawn_watch_host_name = 0;
231  mr->spawn_max_processes = 0;
232  mr->spawn_watch_af = 4;
233  mr->spawn_watch_port_range[0] = 0;
234  mr->spawn_watch_port_range[1] = 0;
235  mr->spawn_gap_msec[0] = 1000;
236  mr->spawn_gap_msec[1] = 10000;
237  mr->spawn_watch_accept_onhold_msec = (60 * 1000);
238 
239  mr->verbosity = 5;
240 
241  mr->onk = 1;
242  mr->single_thread = 0;
243  mr->one_step_sort = 0;
244  mr->step_sync = 0;
245  mr->trace_sorting = 0;
246  mr->trace_file_io = 0;
247  mr->trace_map_ms = 0;
248  mr->trace_map_spawn = 0;
249  mr->trace_alltoall = 0;
250  mr->trace_kmrdp = 0;
251  mr->trace_iolb = 0;
252  mr->std_abort = 0;
253  mr->file_io_dummy_striping = 1;
254  mr->file_io_always_alltoallv = 0;
255  mr->spawn_sync_at_startup = 0;
256  mr->spawn_watch_all = 0;
257  mr->spawn_disconnect_early = 0;
258  mr->spawn_disconnect_but_free = 0;
259  mr->spawn_pass_intercomm_in_argument = 0;
260  mr->keep_fds_at_fork = 0;
261 
262  mr->mpi_thread_support = (mpi_thrd == MPI_THREAD_SERIALIZED
263  || mpi_thrd == MPI_THREAD_MULTIPLE);
264 
265  mr->stop_at_some_check_globally = 0;
266  mr->pushoff_hang_out = 0;
267  mr->pushoff_fast_notice = 0;
268  mr->pushoff_stat = 1;
269  memset(&mr->pushoff_statistics, 0, sizeof(mr->pushoff_statistics));
270 
271  mr->kmrviz_trace = 0;
272 
273  if (identifying_name != 0) {
274  size_t s = strlen(identifying_name);
275  assert(s < KMR_JOB_NAME_LEN);
276  strncpy(mr->identifying_name, identifying_name, KMR_JOB_NAME_LEN);
277  mr->identifying_name[KMR_JOB_NAME_LEN - 1] = 0;
278  } else {
279  mr->identifying_name[0] = 0;
280  }
281 
282  /* KMR is now usable (with default setting). */
283 
284  /* Load and merge MPI infos. */
285 
286  cc = MPI_Info_create(&mr->conf);
287  assert(cc == MPI_SUCCESS);
288  cc = kmr_load_preference(mr, mr->conf);
289  assert(cc == MPI_SUCCESS);
290  if (conf != MPI_INFO_NULL) {
291  cc = kmr_copy_mpi_info(conf, mr->conf);
292  assert(cc == MPI_SUCCESS);
293  }
294 
295  kmr_check_options(mr, mr->conf);
296 
297  /* Initialize checkpoint context. */
299 
300  /* Initialize KMRViz trace */
302 
303  return mr;
304 }
305 
306 KMR *
307 kmr_create_context_world()
308 {
309  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, "");
310  return mr;
311 }
312 
313 KMR *
314 kmr_create_context_ff(const int fcomm, const int finfo,
315  const char *identifying_name)
316 {
317  MPI_Comm comm = MPI_Comm_f2c(fcomm);
318  MPI_Info info = MPI_Info_f2c(finfo);
319  KMR *mr = kmr_create_context(comm, info, identifying_name);
320  return mr;
321 }
322 
323 /** Releases a context created with kmr_create_context(). */
324 
325 int
327 {
328  int cc;
329  if (mr->kvses.head != 0 || mr->kvses.tail != 0) {
330  kmr_warning(mr, 1, "Some key-value streams remain unfreed");
331  for (KMR_KVS *p = mr->kvses.head; p != 0; p = p->c.link.next) {
332  if (!KMR_KVS_MAGIC_OK(p->c.magic)) {
333  kmr_warning(mr, 1, "- unfreed kvs in bad state");
334  } else if (p->c.magic == KMR_KVS_ONCORE) {
335  if (p->c.info_line0.file != 0) {
336  char ee[80];
337  snprintf(ee, 80, "- kvs allocated at %s:%d: %s",
338  p->c.info_line0.file, p->c.info_line0.line,
339  p->c.info_line0.func);
340  kmr_warning(mr, 1, ee);
341  }
342  } else {
343  kmr_warning(mr, 1, "- unfreed kvs in bad state");
344  }
345  }
346  }
347 
348  /* Finalize KMRViz trace */
349  kmr_trace_finalize(mr);
350 
351  /* Free checkpoint context. */
353 
354  if (mr->log_traces != 0) {
355  cc = fclose(mr->log_traces);
356  if (cc == EOF) {
357  char ee[80];
358  char *m = strerror(errno);
359  snprintf(ee, sizeof(ee), "Closing log file failed: %s", m);
360  kmr_warning(mr, 1, ee);
361  }
362  mr->log_traces = 0;
363  }
364 
365  cc = MPI_Comm_free(&mr->comm);
366  assert(cc == MPI_SUCCESS);
367  if (mr->conf != MPI_INFO_NULL) {
368  cc = MPI_Info_free(&mr->conf);
369  assert(cc == MPI_SUCCESS);
370  }
371 
372  if (mr->spawn_watch_program != 0) {
373  size_t s = (strlen(mr->spawn_watch_program) + 1);
374  kmr_free(mr->spawn_watch_program, s);
375  }
376  assert(mr->spawn_comms == 0);
377  /*mr->kmr_installation_path;*/
378  /*mr->spawn_watch_prefix;*/
379  /*mr->spawn_watch_host_name;*/
380 
381  kmr_free(mr, sizeof(struct kmr_ctx));
382  return MPI_SUCCESS;
383 }
384 
385 KMR *
386 kmr_get_context_of_kvs(KMR_KVS const *kvs)
387 {
388  KMR *mr = kvs->c.mr;
389  return mr;
390 }
391 
392 /* Unlinks a key-value stream from a list on a context. */
393 
394 static inline void
395 kmr_unlink_kvs(KMR_KVS *kvs)
396 {
397  KMR *mr = kvs->c.mr;
398  KMR_KVS *prev = kvs->c.link.prev;
399  KMR_KVS *next = kvs->c.link.next;
400  if (prev != 0) {
401  prev->c.link.next = next;
402  } else {
403  assert(mr->kvses.head == kvs);
404  mr->kvses.head = next;
405  }
406  if (next != 0) {
407  next->c.link.prev = prev;
408  } else {
409  assert(mr->kvses.tail == kvs);
410  mr->kvses.tail = prev;
411  }
412 }
413 
414 /** Makes a new key-value stream (type KMR_KVS). It allocates by the
415  size of the union, which may be larger than the necessary. */
416 
417 static KMR_KVS *
418 kmr_create_raw_kvs(KMR *mr, const KMR_KVS *_similar)
419 {
420  xassert(mr != 0);
421  /*assert(similar->c.mr == mr);*/
422  KMR_KVS *kvs = kmr_malloc(sizeof(KMR_KVS));
423  KMR_DEBUGX(memset(kvs, 0, sizeof(KMR_KVS)));
424  kvs->c.magic = KMR_KVS_ONCORE;
425  kvs->c.mr = mr;
426  kmr_link_kvs(kvs);
427 
428  if (kmr_ckpt_enabled(mr)) {
429  mr->ckpt_kvs_id_counter++;
430  kvs->c.ckpt_kvs_id = mr->ckpt_kvs_id_counter;
431  kvs->c.ckpt_generated_op = 0;
432  kvs->c.ckpt_consumed_op = 0;
433  }
434 
435  kvs->c.key_data = KMR_KV_BAD;
436  kvs->c.value_data = KMR_KV_BAD;
437  kvs->c.element_count = 0;
438 
439  kvs->c.oncore = 1;
440  kvs->c.stowed = 0;
441  kvs->c.nogrow = 0;
442  kvs->c.sorted = 0;
443  kvs->c.shuffled_in_pushoff = 0;
444  kvs->c._uniformly_sized_ = 0;
445 
446  kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
447  kvs->c.element_size_limit = (kvs->c.block_size / 4);
448  kvs->c.storage_netsize = 0;
449  kvs->c.block_count = 0;
450  kvs->c.first_block = 0;
451  kvs->c.ms = 0;
452 
453  /* Transient fields: */
454 
455  kvs->c.under_threaded_operation = 0;
456  kvs->c.current_block = 0;
457  kvs->c.adding_point = 0;
458  kvs->c.temporary_data = 0;
459 
460  //KMR_OMP_INIT_LOCK(&kvs->c.mutex);
461 
462  return kvs;
463 }
464 
465 /* Clears the slots of the structure. It keeps the fields of the a
466  link and checkpointing. */
467 
468 void
469 kmr_init_kvs_oncore(KMR_KVS *kvs, KMR *mr)
470 {
471  assert(mr != 0);
472  kvs->c.magic = KMR_KVS_ONCORE;
473  kvs->c.mr = mr;
474  /*kmr_link_kvs(kvs);*/
475 
476  /*kvs->c.ckpt_kvs_id = 0;*/
477  /*kvs->c.ckpt_generated_op = 0;*/
478  /*kvs->c.ckpt_consumed_op = 0;*/
479 
480  kvs->c.key_data = KMR_KV_BAD;
481  kvs->c.value_data = KMR_KV_BAD;
482  kvs->c.element_count = 0;
483 
484  kvs->c.oncore = 1;
485  kvs->c.stowed = 0;
486  kvs->c.nogrow = 0;
487  kvs->c.sorted = 0;
488  kvs->c.shuffled_in_pushoff = 0;
489  kvs->c._uniformly_sized_ = 0;
490 
491  kvs->c.block_size = (mr->preset_block_size - mr->malloc_overhead);
492  kvs->c.element_size_limit = (kvs->c.block_size / 4);
493  kvs->c.storage_netsize = 0;
494  kvs->c.block_count = 0;
495  kvs->c.first_block = 0;
496  kvs->c.ms = 0;
497 
498  /* Transient fields: */
499 
500  kvs->c.under_threaded_operation = 0;
501  kvs->c.current_block = 0;
502  kvs->c.adding_point = 0;
503  kvs->c.temporary_data = 0;
504 }
505 
506 /** Makes a new key-value stream with the specified field
507  data-types. */
508 
509 KMR_KVS *
511  struct kmr_option opt,
512  const char *file, const int line, const char *func)
513 {
514  KMR_KVS *kvs = kmr_create_raw_kvs(mr, 0);
515  kvs->c.key_data = kf;
516  kvs->c.value_data = vf;
517  kvs->c.info_line0.file = file;
518  kvs->c.info_line0.func = func;
519  kvs->c.info_line0.line = line;
520 
521  if (kmr_ckpt_enabled(mr)) {
523  }
524 
525  return kvs;
526 }
527 
528 /** Moves the contents of the input KVI to the output KVO. It
529  consumes the input KVI. Calling kmr_map() with a null
530  map-function has the same effect. Effective-options: TAKE_CKPT.
531  See struct kmr_option. */
532 
533 int
534 kmr_move_kvs(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
535 {
536  assert(kvi != 0 && kvo != 0
537  && kvi->c.magic == KMR_KVS_ONCORE
538  && kvo->c.magic == KMR_KVS_ONCORE
539  && kvi->c.oncore && kvo->c.oncore);
540  assert(kvi->c.key_data == kvo->c.key_data
541  && kvi->c.value_data == kvo->c.value_data);
542  assert(kvi->c.stowed && !kvo->c.stowed);
543  // struct kmr_option kmr_supported = {.take_ckpt = 1};
544  // kmr_check_fn_options(kvo->c.mr, kmr_supported, opt, __func__);
545 
546  if (kmr_ckpt_enabled(kvo->c.mr)) {
547  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
548  kvi->c.first_block = 0;
549  kvi->c.ms = 0;
550  kmr_free_kvs(kvi);
551  return MPI_SUCCESS;
552  }
553  }
554 
555  /* Copy state. */
556 
557  kvo->c.stowed = kvi->c.stowed;
558  kvo->c.nogrow = kvi->c.nogrow;
559  kvo->c.sorted = kvi->c.sorted;
560  kvo->c.element_count = kvi->c.element_count;
561  kvo->c.storage_netsize = kvi->c.storage_netsize;
562  kvo->c.block_count = kvi->c.block_count;
563  kvo->c.first_block = kvi->c.first_block;
564  kvo->c.ms = kvi->c.ms;
565 
566  if (kmr_ckpt_enabled(kvo->c.mr)) {
567  kmr_ckpt_save_kvo_whole(kvo->c.mr, kvo);
568  }
569 
570  /* Dispose of input. */
571 
572  kvi->c.first_block = 0;
573  kvi->c.ms = 0;
574  int cc = kmr_free_kvs(kvi);
575  assert(cc == MPI_SUCCESS);
576 
577  if (kmr_ckpt_enabled(kvo->c.mr)) {
578  kmr_ckpt_progress_fin(kvo->c.mr);
579  }
580  return MPI_SUCCESS;
581 }
582 
583 static inline int
584 kmr_free_kvs_oncore(KMR_KVS *kvs)
585 {
586  struct kmr_kvs_block *b = kvs->c.first_block;
587  while (b != 0) {
588  struct kmr_kvs_block *bn = b->next;
589  kmr_free(b, b->size);
590  b = bn;
591  }
592  if (kvs->c.ms != 0) {
593  long cnt = kvs->c.element_count;
594  size_t sz = (sizeof(struct kmr_map_ms_state)
595  + (sizeof(char) * (size_t)cnt));
596  kmr_free(kvs->c.ms, sz);
597  }
598  if (kvs->c.temporary_data != 0) {
599  kmr_free(kvs->c.temporary_data, 0);
600  }
601  kvs->c.magic = KMR_KVS_BAD;
602 
603  /* Delete checkpoint file if exists. */
604 
605  if (kmr_ckpt_enabled(kvs->c.mr)) {
607  }
608 
609  kmr_free(kvs, sizeof(struct kmr_kvs_oncore));
610  return MPI_SUCCESS;
611 }
612 
613 /** Releases a key-value stream (type KMR_KVS). Normally,
614  mapper/shuffler/reducer consumes and frees the input key-value
615  stream, and explicit calls are unnecessary. Here,
616  mapper/shuffler/reducer includes kmr_map(),
617  kmr_map_on_rank_zero(), kmr_map_ms(), kmr_shuffle(),
618  kmr_replicate(), kmr_reduce(), and kmr_reduce_as_one(). */
619 
620 int
622 {
623  if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
624  kmr_error(0, "kmr_free_kvs: kvs already freed or corrupted");
625  }
626  kmr_unlink_kvs(kvs);
627 
628  int cc;
629  if (kvs->c.magic == KMR_KVS_ONCORE) {
630  cc = kmr_free_kvs_oncore(kvs);
631  return cc;
632  } else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
633  cc = kmr_free_kvs_pushoff(kvs, 1);
634  return cc;
635  } else {
636  assert((kvs->c.magic == KMR_KVS_ONCORE)
637  || (kvs->c.magic == KMR_KVS_PUSHOFF));
638  assert(NEVERHERE);
639  return 0;
640  }
641 }
642 
643 /* ================================================================ */
644 
645 /* Allocates a new block of storage as a current-block. When the
646  SIZE=1, it allocates a block by the pre-specified block-size, and
647  allows it to glow incrementally. When the SIZE!=1, it allocates a
648  block by that size after increasing it for the spaces of a header
649  and an end-of-block marker. It sets the STORAGE_NETSIZE field, and
650  places an end-of-block marker, because all key-value pairs should
651  fit in the given size. It accepts zero as a legitimate size. */
652 
653 int
654 kmr_allocate_block(KMR_KVS *kvs, size_t size)
655 {
656  if (size != 1) {
657  assert(kvs->c.element_count == 0 && kvs->c.storage_netsize == 0
658  && kvs->c.block_count == 0 && kvs->c.first_block == 0
659  && kvs->c.current_block == 0 && kvs->c.adding_point == 0);
660  }
661  size_t netsz;
662  size_t sz;
663  if (size == 0) {
664  kvs->c.block_size = 0;
665  kvs->c.nogrow = 1;
666  return MPI_SUCCESS;
667  } else if (size == 1) {
668  netsz = 0;
669  sz = kvs->c.block_size;
670  assert(kvs->c.nogrow == 0);
671  } else {
672  assert(kvs->c.first_block == 0 && kvs->c.current_block == 0
673  && kvs->c.block_count == 0 && kvs->c.adding_point == 0);
674  assert(size >= kmr_kvs_entry_header);
675  netsz = size;
676  sz = (netsz + kmr_kvs_block_header + kmr_kvs_entry_header);
677  kvs->c.block_size = sz;
678  kvs->c.storage_netsize = netsz;
679  kvs->c.nogrow = 1;
680  }
681  struct kmr_kvs_block *b = kmr_malloc(sz);
682  kmr_kvs_reset_block(kvs, b, sz, netsz);
683  kmr_kvs_insert_block(kvs, b);
684  return MPI_SUCCESS;
685 }
686 
687 /* Adjusts the adding-point to the end for putting key-value pairs at
688  once. It is only called when the whole key-value stream size is
689  known in advance. */
690 
691 static inline void
692 kmr_kvs_adjust_adding_point(KMR_KVS *kvs)
693 {
694  if (kvs->c.block_count == 0) {
695  assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
696  } else {
697  assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
698  struct kmr_kvs_block *b = kvs->c.current_block;
699  assert(kmr_kvs_first_entry(kvs, b) == kvs->c.adding_point);
700  kvs->c.adding_point = kmr_kvs_last_entry_limit(kvs, b);
701  assert(kvs->c.adding_point == kmr_kvs_adding_point(b));
702  }
703 }
704 
705 /* Adds a key-value pair. It is a body of kmr_add_kv(), without a
706  mutex. It modifies kmr_kv_box XKV (when non-null) to return the
707  pointers to the opaque fields, when a key or a value is opaque. It
708  does not move actual data when RESERVE_SPACE_ONLY=1. */
709 
710 static inline int
711 kmr_add_kv_nomutex(KMR_KVS *kvs, const struct kmr_kv_box kv,
712  struct kmr_kv_box *xkv, _Bool reserve_space_only)
713 {
714  kmr_assert_kv_sizes(kvs, kv);
715  assert(!kvs->c.nogrow || kvs->c.storage_netsize != 0);
716  KMR *mr = kvs->c.mr;
717  int cc;
718  size_t sz = kmr_kvs_entry_size_of_box(kvs, kv);
719  if (sz > (kvs->c.element_size_limit)) {
720  char ee[80];
721  snprintf(ee, 80, "key-value too large (size=%zd)", sz);
722  kmr_error(mr, ee);
723  }
724  if (kvs->c.first_block == 0) {
725  assert(kvs->c.element_count == 0);
726  cc = kmr_allocate_block(kvs, 1);
727  assert(cc == MPI_SUCCESS);
728  }
729  if (!kmr_kvs_entry_fits_in_block(kvs, kvs->c.current_block, sz)) {
730  assert(!kvs->c.nogrow);
731  kmr_kvs_mark_entry_tail(kvs->c.adding_point);
732  cc = kmr_allocate_block(kvs, 1);
733  assert(cc == MPI_SUCCESS);
734  }
735  struct kmr_kvs_entry *e = kvs->c.adding_point;
736  kmr_poke_kv(e, kv, xkv, kvs, reserve_space_only);
737  if (!kvs->c.nogrow) {
738  kvs->c.storage_netsize += kmr_kvs_entry_netsize(e);
739  }
740  kvs->c.current_block->partial_element_count++;
741  kvs->c.current_block->fill_size += kmr_kvs_entry_size(kvs, e);
742  kvs->c.adding_point = kmr_kvs_next_entry(kvs, e);
743  kvs->c.element_count++;
744  return MPI_SUCCESS;
745 }
746 
747 /** Adds a key-value pair. (It is with serialization when a
748  map-function is threaded). */
749 
750 int
751 kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
752 {
753  kmr_assert_kvs_ok(0, kvs, 0, 1);
754  int cc;
755  if (kvs->c.magic == KMR_KVS_ONCORE) {
756  KMR_OMP_CRITICAL_
757  {
758  cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
759  }
760  return cc;
761  } else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
762  KMR_OMP_CRITICAL_
763  {
764  cc = kmr_add_kv_pushoff(kvs, kv);
765  }
766  return cc;
767  } else {
768  assert((kvs->c.magic == KMR_KVS_ONCORE)
769  || (kvs->c.magic == KMR_KVS_PUSHOFF));
770  assert(NEVERHERE);
771  return 0;
772  }
773 }
774 
775 /** Adds a key-value pair as given directly by a pointer. An integer
776  or a double be passed by a pointer (thus like &v). */
777 
778 int
779 kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
780 {
781  union kmr_unit_sized xk;
782  switch (kvs->c.key_data) {
783  case KMR_KV_BAD:
784  xassert(kvs->c.key_data != KMR_KV_BAD);
785  xk.i = 0;
786  break;
787  case KMR_KV_INTEGER:
788  xk.i = *(long *)k;
789  break;
790  case KMR_KV_FLOAT8:
791  xk.d = *(double *)k;
792  break;
793  case KMR_KV_OPAQUE:
794  case KMR_KV_CSTRING:
795  case KMR_KV_POINTER_OWNED:
796  case KMR_KV_POINTER_UNMANAGED:
797  xk.p = k;
798  break;
799  default:
800  xassert(NEVERHERE);
801  xk.i = 0;
802  break;
803  }
804 
805  union kmr_unit_sized xv;
806  switch (kvs->c.value_data) {
807  case KMR_KV_BAD:
808  xassert(kvs->c.value_data != KMR_KV_BAD);
809  xv.i = 0;
810  break;
811  case KMR_KV_INTEGER:
812  xv.i = *(long *)v;
813  break;
814  case KMR_KV_FLOAT8:
815  xv.d = *(double *)v;
816  break;
817  case KMR_KV_OPAQUE:
818  case KMR_KV_CSTRING:
819  case KMR_KV_POINTER_OWNED:
820  case KMR_KV_POINTER_UNMANAGED:
821  xv.p = v;
822  break;
823  default:
824  xassert(NEVERHERE);
825  xv.i = 0;
826  break;
827  }
828 
829  struct kmr_kv_box kv = {.klen = klen, .vlen = vlen, .k = xk, .v = xv};
830  int cc;
831  cc = kmr_add_kv(kvs, kv);
832  return cc;
833 }
834 
835 /** Adds a key-value pair, but only allocates a space and returns the
836  pointers to the key and the value parts. It may enable to create
837  a large key/value data directly in the space. It does not return
838  a proper value if a key/value field is not a pointer. (It cannot
839  be used with a "push-off" key-value stream, because its buffer
840  will be sent out and late fill-in the buffer causes a race). */
841 
842 int
843 kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv,
844  void **keyp, void **valuep)
845 {
846  kmr_assert_kvs_ok(0, kvs, 0, 1);
847  assert(kvs->c.magic == KMR_KVS_ONCORE);
848  int cc;
849  struct kmr_kv_box xkv = {
850  .k.p = 0,
851  .v.p = 0
852  };
853  KMR_OMP_CRITICAL_
854  {
855  cc = kmr_add_kv_nomutex(kvs, kv, &xkv, 1);
856  }
857  if (keyp != 0) {
858  *keyp = (void *)xkv.k.p;
859  }
860  if (valuep != 0) {
861  *valuep = (void *)xkv.v.p;
862  }
863  return cc;
864 }
865 
866 int
867 kmr_add_kv_quick_(KMR_KVS *kvs, const struct kmr_kv_box kv)
868 {
869  int cc = kmr_add_kv_nomutex(kvs, kv, 0, 0);
870  return cc;
871 }
872 
873 /** Marks finished adding key-value pairs. Further addition will be
874  prohibited. Normally, mapper/shuffler/reducer finishes the output
875  key-value stream by itself, and explicit calls are unnecessary.
876  Here, mapper/shuffler/reducer includes kmr_map(),
877  kmr_map_on_rank_zero(), kmr_map_ms(), kmr_shuffle(),
878  kmr_replicate(), and kmr_reduce(). */
879 
880 int
882 {
883  kmr_assert_kvs_ok(0, kvs, 0, 1);
884  if (kvs->c.magic == KMR_KVS_ONCORE) {
885  if (kvs->c.stowed) {
886  kmr_error(kvs->c.mr, "kmr_add_kv_done: may be called already");
887  }
888  if (kvs->c.element_count == 0) {
889  assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);
890  } else {
891  assert(kvs->c.current_block != 0 && kvs->c.adding_point != 0);
892  kmr_kvs_mark_entry_tail(kvs->c.adding_point);
893  }
894  kvs->c.stowed = 1;
895  kvs->c.current_block = 0;
896  kvs->c.adding_point = 0;
897  assert(kvs->c.block_count == 0 || kvs->c.first_block != 0);
898  } else if (kvs->c.magic == KMR_KVS_PUSHOFF) {
900  } else {
901  assert((kvs->c.magic == KMR_KVS_ONCORE)
902  || (kvs->c.magic == KMR_KVS_PUSHOFF));
903  assert(NEVERHERE);
904  return 0;
905  }
906  return MPI_SUCCESS;
907 }
908 
909 /** Adds a key-value pair of strings. The key and value fields should
910  be of opaque data. */
911 
912 int
913 kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
914 {
915  if (!((kvs->c.key_data == KMR_KV_OPAQUE
916  || kvs->c.key_data == KMR_KV_CSTRING)
917  && (kvs->c.value_data == KMR_KV_OPAQUE
918  || kvs->c.value_data == KMR_KV_CSTRING))) {
919  kmr_error(kvs->c.mr,
920  "key-value data-types need be opaque for strings");
921  }
922  size_t klen = (strlen(k) + 1);
923  size_t vlen = (strlen(v) + 1);
924  assert(klen <= INT_MAX && vlen <= INT_MAX);
925  struct kmr_kv_box kv;
926  kv.klen = (int)klen;
927  kv.k.p = k;
928  kv.vlen = (int)vlen;
929  kv.v.p = v;
930  int cc = kmr_add_kv(kvs, kv);
931  return cc;
932 }
933 
934 /** Adds a given key-value pair unmodified. It is a map-function. */
935 
936 int
938  const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
939 {
940  kmr_add_kv(kvo, kv);
941  return MPI_SUCCESS;
942 }
943 
944 /* Packs fields as opaque. When key or value field is a pointer,
945  fields need to be packed to make data-exchanges easy. */
946 
947 static int
948 kmr_collapse_as_opaque(KMR_KVS *kvi, KMR_KVS *kvo, _Bool inspectp)
949 {
950  assert(kvi != 0 && kvo != 0);
951  assert(kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1);
952  int cc;
953  cc = kmr_allocate_block(kvo, kvi->c.storage_netsize);
954  assert(cc == MPI_SUCCESS);
955  struct kmr_option collapse = {.collapse = 1, .inspect = inspectp};
956  cc = kmr_map(kvi, kvo, 0, collapse, kmr_add_identity_fn);
957  assert(cc == MPI_SUCCESS);
958  return MPI_SUCCESS;
959 }
960 
961 /** Packs locally the contents of a key-value stream to a byte array.
962  It is used to save or to send a key-value stream. It returns the
963  allocated memory with its size, and it should be freed by the
964  user. It may fail on allocating a buffer, and then it returns
965  MPI_ERR_BUFFER. Its reverse is performed by kmr_restore_kvs(). */
966 
967 int
968 kmr_save_kvs(KMR_KVS *kvs, void **dataq, size_t *szq,
969  struct kmr_option opt)
970 {
971  if (kvs == 0) {
972  kmr_error_at_site(0, "Null input kvs", 0);
973  } else if (!KMR_KVS_MAGIC_OK(kvs->c.magic)) {
974  kmr_error_at_site(0, "Bad input kvs (freed or corrupted)", 0);
975  }
976  assert(kvs->c.magic == KMR_KVS_ONCORE);
977  kmr_check_fn_options(kvs->c.mr, kmr_noopt, opt, __func__);
978  /*assert(kvs->c.current_block == 0 && kvs->c.adding_point == 0);*/
979  if (kvs->c.ms != 0 || kvs->c.temporary_data != 0) {
980  kmr_warning(kvs->c.mr, 5,
981  "Some fields in KVS may be lost in saved image");
982  }
983  int cc;
984  if (kmr_fields_pointer_p(kvs) || (kvs->c.block_count > 1)) {
985  enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvs->c.key_data);
986  enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvs->c.value_data);
987  KMR_KVS *kvs1 = kmr_create_kvs(kvs->c.mr, keyf, valf);
988  cc = kmr_collapse_as_opaque(kvs, kvs1, 0);
989  assert(cc == MPI_SUCCESS);
990  assert(!kmr_fields_pointer_p(kvs1) && kvs->c.block_count <= 1);
991  cc = kmr_save_kvs(kvs1, dataq, szq, kmr_noopt);
992  assert(cc == MPI_SUCCESS);
993  kmr_free_kvs(kvs1);
994  return MPI_SUCCESS;
995  }
996  assert(!kmr_fields_pointer_p(kvs));
997  size_t netsz = kvs->c.storage_netsize;
998  size_t blocksz = (netsz + kmr_kvs_block_header + kmr_kvs_entry_header);
999  size_t sz = (sizeof(KMR_KVS) + blocksz);
1000  unsigned char *b = malloc(sz);
1001  if (b == 0) {
1002  return MPI_ERR_BUFFER;
1003  }
1004  KMR_KVS *h = (void *)b;
1005  struct kmr_kvs_block *s = (void *)(b + sizeof(KMR_KVS));
1006  memcpy(h, kvs, sizeof(KMR_KVS));
1007  h->c.magic = KMR_KVS_ONCORE_PACKED;
1008  h->c.mr = 0;
1009  h->c.link.next = 0;
1010  h->c.link.prev = 0;
1011  h->c.block_count = 1;
1012  h->c.first_block = 0;
1013  h->c.current_block = 0;
1014  h->c.adding_point = 0;
1015  h->c.ms = 0;
1016  h->c.temporary_data = 0;
1017  if (kvs->c.block_count == 0) {
1018  /*nothing*/
1019  } else if (kvs->c.block_count == 1) {
1020  memcpy(s, kvs->c.first_block, blocksz);
1021  s->size = blocksz;
1022  } else {
1023  xassert(NEVERHERE);
1024  }
1025  *dataq = b;
1026  *szq = sz;
1027  return MPI_SUCCESS;
1028 }
1029 
1030 /** Unpacks locally the contents of a key-value stream from a byte
1031  array. It is a reverse of kmr_save_kvs(). */
1032 
1033 int
1034 kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz_,
1035  struct kmr_option opt)
1036 {
1037  assert(kvo != 0 && kvo->c.magic == KMR_KVS_ONCORE);
1038  kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
1039  int cc;
1040  unsigned char *b = data;
1041  KMR_KVS *h = (void *)b;
1042  unsigned char *s = (b + sizeof(KMR_KVS));
1043  if (h->c.magic != KMR_KVS_ONCORE_PACKED) {
1044  kmr_warning(kvo->c.mr, 1, "Bad packed data, magic mismatch");
1045  return MPI_ERR_TYPE;
1046  }
1047  size_t netsz = h->c.storage_netsize;
1048  size_t blocksz = (netsz + kmr_kvs_block_header + kmr_kvs_entry_header);
1049  cc = kmr_allocate_block(kvo, netsz);
1050  assert(cc == MPI_SUCCESS);
1051  if (netsz != 0) {
1052  memcpy(kvo->c.first_block, s, blocksz);
1053  }
1054  kvo->c.key_data = h->c.key_data;
1055  kvo->c.value_data = h->c.value_data;
1056  assert(kvo->c.sorted == 0);
1057  kvo->c.element_count = h->c.element_count;
1058  kmr_kvs_adjust_adding_point(kvo);
1059  kmr_add_kv_done(kvo);
1060  return MPI_SUCCESS;
1061 }
1062 
1063 /* ================================================================ */
1064 
1065 /* Calls a map-function on entries in aggregate. EV holds EVCNT
1066  entries. MAPCOUNT is a counter of mapped entries from the
1067  beginning. */
1068 
1069 static inline int
1070 kmr_map_parked(struct kmr_kv_box *ev, long evcnt, long mapcount,
1071  _Bool k_reclaim, _Bool v_reclaim,
1072  KMR_KVS *kvi, KMR_KVS *kvo, kmr_mapfn_t m,
1073  void *arg, struct kmr_option opt)
1074 {
1075  int cc;
1076  KMR *mr = kvi->c.mr;
1077  long cnt = kvi->c.element_count;
1078  if (mr->single_thread || opt.nothreading) {
1079  for (long i = 0; i < evcnt; i++) {
1080  double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1081  cc = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
1082  double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1083  if (cc != MPI_SUCCESS) {
1084  char ee[80];
1085  snprintf(ee, sizeof(ee),
1086  "Map-fn returned with error cc=%d", cc);
1087  kmr_error(mr, ee);
1088  }
1089  if (mr->log_traces != 0) {
1090  kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
1091  m, (t1 - t0));
1092  }
1093  }
1094  } else {
1095  if (kvo != 0) {
1096  kvo->c.under_threaded_operation = 1;
1097  }
1098  KMR_OMP_PARALLEL_FOR_
1099  for (long i = 0; i < evcnt; i++) {
1100  double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1101  int ccx = (*m)(ev[i], kvi, kvo, arg, (mapcount + i));
1102  double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
1103  if (ccx != MPI_SUCCESS) {
1104  char ee[80];
1105  snprintf(ee, sizeof(ee),
1106  "Map-fn returned with error cc=%d", ccx);
1107  kmr_error(mr, ee);
1108  }
1109  if (mr->log_traces != 0) {
1110  kmr_log_map(mr, kvi, &ev[i], (mapcount + 1), cnt,
1111  m, (t1 - t0));
1112  }
1113  }
1114  if (kvo != 0) {
1115  kvo->c.under_threaded_operation = 0;
1116  }
1117  }
1118  for (long i = 0; i < evcnt; i++) {
1119  if (k_reclaim) {
1120  kmr_free((void *)ev[i].k.p, (size_t)ev[i].klen);
1121  }
1122  if (v_reclaim) {
1123  kmr_free((void *)ev[i].v.p, (size_t)ev[i].vlen);
1124  }
1125  }
1126  return MPI_SUCCESS;
1127 }
1128 
1129 /** Maps by skipping the number of entries. It calls a map-function
1130  on entries from FROM, skipping by STRIDE, up to LIMIT
1131  non-inclusive. See kmr_map(). */
1132 
1133 int
1134 kmr_map_skipping(long from, long stride, long limit,
1135  _Bool stop_when_some_added,
1136  KMR_KVS *kvi, KMR_KVS *kvo,
1137  void *arg, struct kmr_option opt, kmr_mapfn_t m)
1138 {
1139  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1140  assert(from >= 0 && stride > 0 && limit >= 0);
1141  assert(kvi->c.current_block == 0);
1142  limit = ((limit != 0) ? limit : LONG_MAX);
1143  KMR *mr = kvi->c.mr;
1144 
1145  if (kmr_ckpt_enabled(mr)) {
1146  if (kmr_ckpt_progress_init(kvi, kvo, opt)){
1147  if (kvo != 0 && !opt.keep_open) {
1148  kmr_add_kv_done(kvo);
1149  }
1150  if (!opt.inspect) {
1151  kmr_free_kvs(kvi);
1152  }
1153  return MPI_SUCCESS;
1154  }
1155  from = kmr_ckpt_first_unprocessed_kv(mr);
1157  }
1158 
1159  int cc;
1160 
1161  if (mr->step_sync) {
1162  cc = MPI_Barrier(mr->comm);
1163  assert(MPI_SUCCESS);
1164  }
1165  if (kvo != 0 && opt.collapse) {
1166  assert(!kmr_fields_pointer_p(kvo));
1167  }
1168  _Bool k_reclaim = (!opt.inspect && (kmr_key_pointer_p(kvi)));
1169  _Bool v_reclaim = (!opt.inspect && (kmr_value_pointer_p(kvi)));
1170  long evsz = mr->mapper_park_size;
1171  struct kmr_kv_box *
1172  ev = kmr_malloc((sizeof(struct kmr_kv_box) * (size_t)evsz));
1173  long evcnt = 0;
1174  long mapcount = 0;
1175  long nextindex = from;
1176  long index = 0;
1177  kvi->c.current_block = kvi->c.first_block;
1178  while (index < kvi->c.element_count) {
1179  assert(kvi->c.current_block != 0);
1180  struct kmr_kvs_block *b = kvi->c.current_block;
1181  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, b);
1182  for (int i = 0; i < b->partial_element_count; i++) {
1183  assert(e != 0);
1184  if (index == nextindex && index < limit) {
1185  ev[evcnt++] = kmr_pick_kv(e, kvi);
1186  nextindex = (index + stride);
1187  if (k_reclaim) {
1188  union kmr_unit_sized *w = kmr_point_key(e);
1189  w->p = 0;
1190  }
1191  if (v_reclaim) {
1192  union kmr_unit_sized *w = kmr_point_value(e);
1193  w->p = 0;
1194  }
1195  } else {
1196  if (k_reclaim) {
1197  union kmr_unit_sized *w = kmr_point_key(e);
1198  kmr_free((void *)w->p, (size_t)e->klen);
1199  w->p = 0;
1200  }
1201  if (v_reclaim) {
1202  union kmr_unit_sized *w = kmr_point_value(e);
1203  kmr_free((void *)w->p, (size_t)e->vlen);
1204  w->p = 0;
1205  }
1206  }
1207  if (evcnt >= evsz) {
1208  cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
1209  kvi, kvo, m, arg, opt);
1210  assert(cc == MPI_SUCCESS);
1211 
1212  if (kmr_ckpt_enabled(mr)) {
1213  kmr_ckpt_save_kvo_block_add(mr, kvo, evcnt);
1214  }
1215 
1216  mapcount += evcnt;
1217  evcnt = 0;
1218 
1219  if (stop_when_some_added) {
1220  _Bool done;
1221  if (mr->stop_at_some_check_globally) {
1222  done = 0;
1223  } else {
1224  done = (kvo->c.element_count != 0);
1225  }
1226  if (done) {
1227  /* Fake as if go to the end. */
1228  index = (kvi->c.element_count - 1);
1229  while (b->next != 0) {
1230  b = b->next;
1231  }
1232  }
1233  }
1234  }
1235  e = kmr_kvs_next(kvi, e, 1);
1236  index++;
1237  }
1238  kvi->c.current_block = b->next;
1239  }
1240  assert(kvi->c.current_block == 0);
1241  if (evcnt > 0) {
1242  cc = kmr_map_parked(ev, evcnt, mapcount, k_reclaim, v_reclaim,
1243  kvi, kvo, m, arg, opt);
1244  assert(cc == MPI_SUCCESS);
1245 
1246  if (kmr_ckpt_enabled(mr)) {
1247  kmr_ckpt_save_kvo_block_add(mr, kvo, evcnt);
1248  }
1249 
1250  mapcount += evcnt;
1251  evcnt = 0;
1252  }
1253  if (kvo != 0 && !opt.keep_open) {
1254  kmr_add_kv_done(kvo);
1255  }
1256 
1257  if (kmr_ckpt_enabled(mr)) {
1258  kmr_ckpt_save_kvo_block_fin(mr, kvo);
1259  }
1260 
1261  if (!opt.inspect) {
1262  kmr_free_kvs(kvi);
1263  }
1264  if (ev != 0) {
1265  kmr_free(ev, (sizeof(struct kmr_kv_box) * (size_t)evsz));
1266  }
1267 
1268  if (kmr_ckpt_enabled(mr)) {
1270  }
1271  return MPI_SUCCESS;
1272 }
1273 
1274 /** Maps simply. It consumes the input key-value stream KVI unless
1275  INSPECT option is marked. The output key-value stream KVO can be
1276  null, but in that case, a map-function cannot add key-value pairs.
1277  The pointer ARG is just passed to a map-function as a general
1278  argument, where accesses to it should be race-free, since a
1279  map-function is called by threads by default. M is the
1280  map-function. See the description on the type ::kmr_mapfn_t. It
1281  copeis the contents of the input KVI to the output KVO, when a
1282  map-function is null. During processing, it first makes an array
1283  pointing to the key-value entries in each data block, and works on
1284  it for ease threading/parallelization. Effective-options:
1285  NOTHREADING, INSPECT, KEEP_OPEN, COLLAPSE, TAKE_CKPT.
1286  See struct kmr_option. */
1287 
1288 int
1289 kmr_map9(_Bool stop_when_some_added,
1290  KMR_KVS *kvi, KMR_KVS *kvo,
1291  void *arg, struct kmr_option opt, kmr_mapfn_t m,
1292  const char *file, const int line, const char *func)
1293 {
1294  int cc;
1295  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
1296  KMR *mr = kvi->c.mr;
1297  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
1298  .keep_open = 1, .collapse = 1,
1299  .take_ckpt = 1};
1300  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1301 
1302  kmr_trace_entry_t * kte_start = 0;
1303  if (mr->kmrviz_trace) {
1304  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_MAP, 0, kvi, kvo);
1305  }
1306  struct kmr_code_line info;
1307  if (mr->atwork == 0) {
1308  info.file = file;
1309  info.func = func;
1310  info.line = line;
1311  mr->atwork = &info;
1312  }
1313 
1314  if (m != 0) {
1315  cc = kmr_map_skipping(0, 1, 0, stop_when_some_added,
1316  kvi, kvo, arg, opt, m);
1317  } else {
1318  assert(!opt.inspect && !opt.keep_open);
1319  cc = kmr_move_kvs(kvi, kvo, opt);
1320  }
1321 
1322  if (mr->atwork == &info) {
1323  mr->atwork = 0;
1324  }
1325  if (mr->kmrviz_trace) {
1326  if (!opt.inspect) {
1327  kvi = 0;
1328  }
1329  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_MAP, kte_start, kvi, kvo);
1330  }
1331 
1332  return cc;
1333 }
1334 
1335 /** Maps sequentially with rank by rank for debugging. See
1336  kmr_map. */
1337 
1338 int
1340  void *arg, struct kmr_option opt, kmr_mapfn_t m)
1341 {
1342  KMR *mr = kvi->c.mr;
1343  int nprocs = mr->nprocs;
1344  int cc;
1345  if (mr->rank != 0) {
1346  cc = MPI_Recv(0, 0, MPI_INT, (mr->rank - 1),
1347  KMR_TAG_MAP_BY_RANK, mr->comm, MPI_STATUS_IGNORE);
1348  assert(cc == MPI_SUCCESS);
1349  }
1350  cc = kmr_map(kvi, kvo, arg, opt, m);
1351  assert(cc == MPI_SUCCESS);
1352  fflush(0);
1353  if (mr->rank != (nprocs - 1)) {
1354  usleep(1 * 1000);
1355  cc = MPI_Send(0, 0, MPI_INT, (mr->rank + 1),
1356  KMR_TAG_MAP_BY_RANK, mr->comm);
1357  assert(cc == MPI_SUCCESS);
1358  }
1359  return MPI_SUCCESS;
1360 }
1361 
1362 /** Extracts a single key-value pair locally in the key-value stream
1363  KVI. It is an error when zero or more than one entries are in the
1364  KVI. It does not consume the input KVS (INSPECT IMPLIED). The
1365  returned key-value entry must be used before freeing the input
1366  KVS, when it points to an opaque data. */
1367 
1368 int
1370 {
1371  kmr_assert_kvs_ok(kvi, 0, 1, 0);
1372  assert(kvi->c.current_block == 0);
1373  KMR *mr = kvi->c.mr;
1374  kvi->c.current_block = kvi->c.first_block;
1375  if (kvi->c.element_count == 1) {
1376  assert(kvi->c.current_block != 0);
1377  struct kmr_kvs_block *b = kvi->c.current_block;
1378  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, b);
1379  assert(b->partial_element_count == 1);
1380  assert(e != 0);
1381  *kv = kmr_pick_kv(e, kvi);
1382  kvi->c.current_block = 0;
1383  return MPI_SUCCESS;
1384  } else {
1385  if (kvi->c.element_count == 0) {
1386  kmr_warning(mr, 1, "kmr_take_one for no entries");
1387  /*return MPI_ERR_COUNT;*/
1388  } else {
1389  kmr_warning(mr, 1, "kmr_take_one for multiple entries");
1390  /*return MPI_ERR_COUNT;*/
1391  }
1392  MPI_Abort(MPI_COMM_WORLD, 1);
1393  return MPI_SUCCESS;
1394  }
1395 }
1396 
1397 /** Maps once. It calls a map-function once with a dummy key-value
1398  stream and a dummy key-value pair. See kmr_map().
1399  Effective-options: KEEP_OPEN, TAKE_CKPT. See struct kmr_option. */
1400 
1401 int
1402 kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt,
1403  _Bool rank_zero_only, kmr_mapfn_t m)
1404 {
1405  kmr_assert_kvs_ok(0, kvo, 0, 1);
1406  KMR *mr = kvo->c.mr;
1407  struct kmr_option kmr_supported = {.keep_open = 1, .take_ckpt = 1};
1408  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
1409  int rank = mr->rank;
1410  int cc;
1411 
1412  kmr_trace_entry_t * kte_start = 0;
1413  if (mr->kmrviz_trace) {
1414  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_MAP_ONCE, 0,
1415  0, kvo);
1416  }
1417  if (kmr_ckpt_enabled(mr)) {
1418  if (kmr_ckpt_progress_init(0, kvo, opt)) {
1419  kmr_add_kv_done(kvo);
1420  return MPI_SUCCESS;
1421  }
1422  }
1423 
1424  if (!rank_zero_only || rank == 0) {
1425  struct kmr_kv_box kv = {.klen = 0, .vlen = 0, .k.i = 0, .v.i = 0};
1426  cc = (*m)(kv, 0, kvo, arg, 0);
1427  if (cc != MPI_SUCCESS) {
1428  char ee[80];
1429  snprintf(ee, sizeof(ee),
1430  "Map-fn returned with error cc=%d", cc);
1431  kmr_error(mr, ee);
1432  }
1433  }
1434  if (!opt.keep_open) {
1435  cc = kmr_add_kv_done(kvo);
1436  assert(cc == MPI_SUCCESS);
1437  }
1438 
1439  if (kmr_ckpt_enabled(mr)) {
1440  kmr_ckpt_save_kvo_whole(mr, kvo);
1442  }
1443  if (mr->kmrviz_trace) {
1444  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_MAP_ONCE, kte_start, 0, kvo);
1445  }
1446 
1447  return MPI_SUCCESS;
1448 }
1449 
1450 /** Maps on rank0 only. It calls a map-function once with a dummy
1451  key-value stream and a dummy key-value pair. It is used to avoid
1452  low-level conditionals like (myrank==0). See kmr_map().
1453  Effective-options: KEEP_OPEN, TAKE_CKPT. See struct kmr_option. */
1454 
1455 int
1456 kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt,
1457  kmr_mapfn_t m)
1458 {
1459  int cc;
1460  cc = kmr_map_once(kvo, arg, opt, 1, m);
1461  assert(cc == MPI_SUCCESS);
1462  return MPI_SUCCESS;
1463 }
1464 
1465 /* ================================================================ */
1466 
1467 #if 1
1468 
1469 /* Hashes. (MurmurHash3; http://code.google.com/p/smhasher). */
1470 
1471 static inline unsigned long
1472 kmr_hash_key_opaque(const unsigned char *p, int n)
1473 {
1474 #define ROT(X,N) ((X) << (N) | (X) >> (64-(N)))
1475 #define KEY(V) (k = (V), k *= 0x87c37b91114253d5UL, \
1476  k = ROT(k, 31), k *= 0x4cf5ad432745937fUL)
1477 #define MIX() (h ^= k, h = ROT(h, 31), h = h * 5 + 0xe6546b64)
1478 #define FIN() (h ^= (h >> 33), h *= 0xff51afd7ed558ccdUL, h ^= (h >> 33), \
1479  h *= 0xc4ceb9fe1a85ec53UL, h ^= (h >> 33))
1480  unsigned long h = 0x85ebca6bUL; /*c2b2ae35*/
1481  unsigned long k;
1482  const unsigned long *v = (void *)p;
1483  int n8 = (n / 8);
1484  int rn = (n - (8 * n8));
1485  const unsigned char *r = &p[(8 * n8)];
1486  for (int i = 0; i < n8; i++) {
1487  KEY(v[i]);
1488  MIX();
1489  }
1490  union {unsigned long i; unsigned char c[8];} u = {.i = 0UL};
1491  for (int i = 0; i < rn; i++) {
1492  u.c[i] = r[i];
1493  }
1494  KEY(u.i);
1495  MIX();
1496  FIN();
1497  return h;
1498 #undef ROT
1499 #undef KEY
1500 #undef MIX
1501 #undef FIN
1502 }
1503 
1504 #elif 0
1505 
1506 /* (java 1.2 hash). */
1507 
1508 static inline unsigned long
1509 kmr_hash_key_opaque(const unsigned char *p, int n)
1510 {
1511  unsigned long hash = 0;
1512  for (i = 0 ; i < n ; i++) {
1513  hash *= 31;
1514  hash += p[i];
1515  }
1516  return hash;
1517 }
1518 
1519 #else
1520 
1521 /* (java 1.0 hash). */
1522 
1523 static inline unsigned long
1524 kmr_hash_key_opaque(const unsigned char *p, int n)
1525 {
1526  unsigned long hash = 0;
1527  int m, k, i;
1528  if (n <= 15) {
1529  m = 1;
1530  for (i = 0 ; i < n ; i++) {
1531  hash += (p[i] * m);
1532  m *= 37;
1533  }
1534  } else {
1535  m = 1;
1536  k = n / 8;
1537  for (i = 0 ; i < n ; i += k) {
1538  hash += (p[i] * m);
1539  m *= 39;
1540  }
1541  }
1542  return hash;
1543 }
1544 
1545 #endif
1546 
1547 /* Makes an integer key for a key-value pair for shuffling. It
1548  returns the long bit representation for float. This hash is OK for
1549  the key_as_rank option. (kmr_hash_key() is for shuffling, and
1550  kmr_stable_key() is for sorting). */
1551 
1552 static inline signed long
1553 kmr_hash_key(const KMR_KVS *kvs, const struct kmr_kv_box kv)
1554 {
1555  switch (kvs->c.key_data) {
1556  case KMR_KV_BAD:
1557  xassert(kvs->c.key_data != KMR_KV_BAD);
1558  return -1;
1559  case KMR_KV_OPAQUE:
1560  case KMR_KV_CSTRING:
1561  return (signed long)kmr_hash_key_opaque((void *)kv.k.p, kv.klen);
1562  case KMR_KV_INTEGER:
1563  return kv.k.i;
1564  case KMR_KV_FLOAT8:
1565  return kv.k.i;
1566  case KMR_KV_POINTER_OWNED:
1567  case KMR_KV_POINTER_UNMANAGED:
1568  xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
1569  && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
1570  return -1;
1571  default:
1572  xassert(NEVERHERE);
1573  return -1;
1574  }
1575 }
1576 
1577 /* Returns an integer of the first 8 bytes, shifted to the right by
1578  one-bit to make it positive. It is consistent with the ordering by
1579  memcmp(). */
1580 
1581 static inline long
1582 kmr_stable_key_opaque(const struct kmr_kv_box kv)
1583 {
1584  unsigned char *p = (unsigned char *)kv.k.p;
1585  int n = kv.klen;
1586  unsigned long hash = 0;
1587  for (int i = 0; i < (int)sizeof(long); i++) {
1588  unsigned char v = ((i < n) ? p[i] : 0);
1589  hash = ((hash << 8) + v);
1590  }
1591  return (long)(hash >> 1);
1592 }
1593 
1594 /* Makes an integer key for a key-value pair for sorting. It returns
1595  a signed value for comparing integers. It is consistent with the
1596  ordering by memcmp() for opaque keys. (kmr_hash_key() is for
1597  shuffling, and kmr_stable_key() is for sorting). */
1598 
1599 signed long
1600 kmr_stable_key(const struct kmr_kv_box kv, const KMR_KVS *kvs)
1601 {
1602  switch (kvs->c.key_data) {
1603  case KMR_KV_BAD:
1604  xassert(kvs->c.key_data != KMR_KV_BAD);
1605  return -1;
1606  case KMR_KV_OPAQUE:
1607  case KMR_KV_CSTRING:
1608  return kmr_stable_key_opaque(kv);
1609  case KMR_KV_INTEGER:
1610  return kv.k.i;
1611  case KMR_KV_FLOAT8:
1612  {
1613  long v0 = kv.k.i;
1614  long v1 = ((v0 >= 0L) ? v0 : ((-v0) | (1L << 63)));
1615  /*assert(v0 >= 0 || v1 < 0);*/
1616  return v1;
1617  }
1618  case KMR_KV_POINTER_OWNED:
1619  case KMR_KV_POINTER_UNMANAGED:
1620  xassert(kvs->c.key_data != KMR_KV_POINTER_OWNED
1621  && kvs->c.key_data != KMR_KV_POINTER_UNMANAGED);
1622  return -1;
1623  default:
1624  xassert(NEVERHERE);
1625  return -1;
1626  }
1627 }
1628 
1629 /* Determines a rank to which this key-value entry is directed. It is
1630  bases on the hashed keys. */
1631 
1632 int
1633 kmr_pitch_rank(const struct kmr_kv_box kv, KMR_KVS *kvs)
1634 {
1635  unsigned int nprocs = (unsigned int)kvs->c.mr->nprocs;
1636  unsigned long v = (unsigned long)kmr_hash_key(kvs, kv);
1637  unsigned int h = (((v >> 32) ^ v) & ((1L << 32) - 1));
1638  return (int)(h % nprocs);
1639 }
1640 
1641 /* Compares in three-way, returning -1, 0, or 1. */
1642 
1643 #define KMR_CMP3(X, Y) (((X) == (Y)) ? 0 : ((X) < (Y)) ? -1 : 1)
1644 
1645 /* Compares byte strings lexicographically. This is compatible to
1646  memcmp() when the lengths are equal, for the terasort
1647  requirement. */
1648 
1649 static inline int
1650 kmr_compare_lexicographically(const unsigned char *p, const int plen,
1651  const unsigned char *q, const int qlen)
1652 {
1653  int s = MIN(plen, qlen);
1654 #if 0
1655  for (int i = 0; i < s; i++) {
1656  if (p[i] != q[i]) {
1657  return (p[i] - q[i]);
1658  }
1659  }
1660 #endif
1661  int cc = memcmp(p, q, (size_t)s);
1662  if (cc != 0) {
1663  return cc;
1664  } else {
1665  return (plen - qlen);
1666  }
1667 }
1668 
1669 /* Compares keys lexicographically as byte strings. */
1670 
1671 static int
1672 kmr_compare_opaque(const struct kmr_kv_box *p,
1673  const struct kmr_kv_box *q)
1674 {
1675  return kmr_compare_lexicographically((unsigned char *)p->k.p, p->klen,
1676  (unsigned char *)q->k.p, q->klen);
1677 }
1678 
1679 static int
1680 kmr_compare_integer(const struct kmr_kv_box *p0,
1681  const struct kmr_kv_box *p1)
1682 {
1683  return KMR_CMP3(p0->k.i, p1->k.i);
1684 }
1685 
1686 static int
1687 kmr_compare_float8(const struct kmr_kv_box *p0,
1688  const struct kmr_kv_box *p1)
1689 {
1690  return KMR_CMP3(p0->k.d, p1->k.d);
1691 }
1692 
1693 /* Compares keys lexicographically as byte strings. */
1694 
1695 static int
1696 kmr_compare_record_opaque(const struct kmr_keyed_record *p0,
1697  const struct kmr_keyed_record *p1)
1698 {
1699  struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1700  struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1701  return kmr_compare_lexicographically((unsigned char *)b0.k.p, b0.klen,
1702  (unsigned char *)b1.k.p, b1.klen);
1703 }
1704 
1705 /* (UNUSED) Sorting uses the key part of the record and it does not
1706  peek in the kmr_kv_box. */
1707 
1708 static int
1709 kmr_compare_record_integer_(const struct kmr_keyed_record *p0,
1710  const struct kmr_keyed_record *p1)
1711 {
1712  assert(NEVERHERE);
1713  struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1714  struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1715  long v0 = b0.k.i;
1716  long v1 = b1.k.i;
1717  return KMR_CMP3(v0, v1);
1718 }
1719 
1720 /* (UNUSED) Sorting uses the key part of the record and it does not
1721  peek in the kmr_kv_box. */
1722 
1723 static int
1724 kmr_compare_record_float8_(const struct kmr_keyed_record *p0,
1725  const struct kmr_keyed_record *p1)
1726 {
1727  assert(NEVERHERE);
1728  struct kmr_kv_box b0 = kmr_pick_kv2(p0->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1729  struct kmr_kv_box b1 = kmr_pick_kv2(p1->e, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
1730  double v0 = b0.k.d;
1731  double v1 = b1.k.d;
1732  return KMR_CMP3(v0, v1);
1733 }
1734 
1735 /* Returns an appropriate comparator for kmr_kv_box. */
1736 
1737 kmr_sorter_t
1738 kmr_choose_sorter(const KMR_KVS *kvs)
1739 {
1740  switch (kvs->c.key_data) {
1741  case KMR_KV_BAD:
1742  xassert(kvs->c.key_data != KMR_KV_BAD);
1743  return 0;
1744  case KMR_KV_INTEGER:
1745  return kmr_compare_integer;
1746  case KMR_KV_FLOAT8:
1747  return kmr_compare_float8;
1748  case KMR_KV_OPAQUE:
1749  case KMR_KV_CSTRING:
1750  case KMR_KV_POINTER_OWNED:
1751  case KMR_KV_POINTER_UNMANAGED:
1752  return kmr_compare_opaque;
1753  default:
1754  xassert(NEVERHERE);
1755  return 0;
1756  }
1757 }
1758 
1759 /* Returns an appropriate comparator for keyed-records. It is only
1760  called for KMR_KV_OPAQUE. */
1761 
1762 static kmr_record_sorter_t
1763 kmr_choose_record_sorter(const KMR_KVS *kvs)
1764 {
1765  switch (kvs->c.key_data) {
1766  case KMR_KV_BAD:
1767  xassert(kvs->c.key_data != KMR_KV_BAD);
1768  return 0;
1769  case KMR_KV_INTEGER:
1770  assert(NEVERHERE);
1771  return kmr_compare_record_integer_;
1772  case KMR_KV_FLOAT8:
1773  assert(NEVERHERE);
1774  return kmr_compare_record_float8_;
1775  case KMR_KV_OPAQUE:
1776  case KMR_KV_CSTRING:
1777  case KMR_KV_POINTER_OWNED:
1778  case KMR_KV_POINTER_UNMANAGED:
1779  return kmr_compare_record_opaque;
1780  default:
1781  xassert(NEVERHERE);
1782  return 0;
1783  }
1784 }
1785 
1786 #if 0
1787 
1788 /* Copies the entries as keyed-records with hashed keys for
1789  shuffling. */
1790 
1791 static int
1792 kmr_copy_record_shuffle_fn(const struct kmr_kv_box kv,
1793  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
1794  const long i)
1795 {
1796  return MPI_SUCCESS;
1797 }
1798 
1799 /* Copies the entries as keyed-records with stable keys for
1800  sorting. */
1801 
1802 static int
1803 kmr_copy_record_sorting_fn(const struct kmr_kv_box kv,
1804  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
1805  const long i)
1806 {
1807  return MPI_SUCCESS;
1808 }
1809 
1810 #endif
1811 
1812 /** Compares the key field of keyed-records for qsort/bsearch. */
1813 
1814 static int
1815 kmr_icmp(const void *a0, const void *a1)
1816 {
1817  const struct kmr_keyed_record *p0 = a0;
1818  const struct kmr_keyed_record *p1 = a1;
1819  long v0 = p0->v;
1820  long v1 = p1->v;
1821  return KMR_CMP3(v0, v1);
1822 }
1823 
1824 static int
1825 kmr_sort_locally_lo(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling,
1826  _Bool ranking, struct kmr_option opt)
1827 {
1828  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
1829  assert(kmr_shuffle_compatible_p(kvo, kvi));
1830  KMR *mr = kvi->c.mr;
1831 
1832  kmr_trace_entry_t * kte_start = 0;
1833  if (mr->kmrviz_trace) {
1834  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_SORT, 0, kvi, kvo);
1835  }
1836  if (kmr_ckpt_enabled(mr)) {
1837  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
1838  kmr_add_kv_done(kvo);
1839  kvo->c.sorted = 1;
1840  if (!opt.inspect) {
1841  kmr_free_kvs(kvi);
1842  }
1843  return MPI_SUCCESS;
1844  }
1845  }
1846  _Bool twostep = !mr->one_step_sort;
1847  _Bool primekey = ((kvi->c.key_data == KMR_KV_INTEGER)
1848  || (kvi->c.key_data == KMR_KV_FLOAT8));
1849  double timestamp[5];
1850  int cc;
1851  long cnt = kvi->c.element_count;
1852  timestamp[0] = MPI_Wtime();
1853  size_t evsz = (sizeof(struct kmr_keyed_record) * (size_t)cnt);
1854  struct kmr_keyed_record *ev = kmr_malloc(evsz);
1855 #if 0
1856  struct kmr_option inspect = {
1857  .inspect = 1,
1858  .nothreading = opt.nothreading
1859  };
1860  if (shuffling) {
1861  cc = kmr_map(kvi, 0, ev, inspect, kmr_copy_record_shuffle_fn);
1862  assert(cc == MPI_SUCCESS);
1863  } else {
1864  cc = kmr_map(kvi, 0, ev, inspect, kmr_copy_record_sorting_fn);
1865  assert(cc == MPI_SUCCESS);
1866  }
1867 #else
1868  cc = kmr_retrieve_keyed_records(kvi, ev, cnt, shuffling, ranking);
1869  assert(cc == MPI_SUCCESS);
1870 #endif
1871  timestamp[1] = MPI_Wtime();
1872  if (shuffling || twostep || primekey) {
1873  if (mr->single_thread || opt.nothreading) {
1874  qsort(ev, (size_t)cnt, sizeof(struct kmr_keyed_record), kmr_icmp);
1875  } else {
1876  kmr_isort(ev, (size_t)cnt, sizeof(struct kmr_keyed_record),
1877  mr->sort_threads_depth);
1878  }
1879  }
1880  timestamp[2] = MPI_Wtime();
1881  if (!shuffling && !primekey) {
1882  /* Sort the array sorted by hashed keys, again by true keys. */
1883  long *runs = kmr_malloc(sizeof(long) * (size_t)cnt);
1884  long nruns = 0;
1885  if (twostep) {
1886  long i = 0;
1887  while (i < cnt) {
1888  do {
1889  i++;
1890  if (i == cnt) {
1891  break;
1892  }
1893  cc = KMR_CMP3(ev[i - 1].v, ev[i].v);
1894  } while (cc == 0);
1895  assert(nruns < cnt);
1896  runs[nruns] = i;
1897  nruns++;
1898  }
1899  assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
1900  } else {
1901  nruns = (cnt == 0 ? 0 : 1);
1902  runs[0] = cnt;
1903  }
1904  kmr_record_sorter_t cmp1 = kmr_choose_record_sorter(kvi);
1905  if (mr->single_thread || opt.nothreading) {
1906  for (long k = 0; k < nruns; k++) {
1907  long j = (k == 0 ? 0 : runs[k - 1]);
1908  long i = runs[k];
1909  assert(j < i);
1910  if ((i - j) > 1) {
1911  qsort(&ev[j], (size_t)(i - j),
1912  sizeof(struct kmr_keyed_record),
1913  (kmr_qsorter_t)cmp1);
1914  }
1915  }
1916  } else {
1917  KMR_OMP_PARALLEL_FOR_
1918  for (long k = 0; k < nruns; k++) {
1919  long j = (k == 0 ? 0 : runs[k - 1]);
1920  long i = runs[k];
1921  assert(j < i);
1922  if ((i - j) > 1) {
1923  qsort(&ev[j], (size_t)(i - j),
1924  sizeof(struct kmr_keyed_record),
1925  (kmr_qsorter_t)cmp1);
1926  }
1927  }
1928  }
1929  kmr_free(runs, (sizeof(long) * (size_t)cnt));
1930  }
1931  timestamp[3] = MPI_Wtime();
1932  size_t sz = kvi->c.storage_netsize;
1933  cc = kmr_allocate_block(kvo, sz);
1934  assert(cc == MPI_SUCCESS);
1935  /*NEED-THREADING*/
1936  for (long i = 0 ; i < cnt; i++) {
1937  struct kmr_kv_box kv = kmr_pick_kv(ev[i].e, kvi);
1938  kmr_add_kv_nomutex(kvo, kv, 0, 0);
1939  }
1940  timestamp[4] = MPI_Wtime();
1941  assert(sz == 0 || kmr_kvs_entry_tail_p(kvo->c.adding_point));
1942  assert(sz == 0 || kvo->c.block_count == 1);
1943  kmr_add_kv_done(kvo);
1944  kmr_assert_on_tail_marker(kvo);
1945  kvo->c.sorted = 1;
1946  kmr_free(ev, evsz);
1947  _Bool tracing = mr->trace_sorting;
1948  if (tracing && (5 <= mr->verbosity)) {
1949  fprintf(stderr, (";;KMR [%05d] kmr_sort_locally"
1950  " time=(%f %f %f %f) (msec)\n"),
1951  mr->rank,
1952  ((timestamp[1] - timestamp[0]) * 1e3),
1953  ((timestamp[2] - timestamp[1]) * 1e3),
1954  ((timestamp[3] - timestamp[2]) * 1e3),
1955  ((timestamp[4] - timestamp[3]) * 1e3));
1956  fflush(0);
1957  }
1958  if (kmr_ckpt_enabled(mr)) {
1959  kmr_ckpt_save_kvo_whole(mr, kvo);
1960  }
1961  if (!opt.inspect) {
1962  kmr_free_kvs(kvi);
1963  kvi = 0;
1964  }
1965  if (kmr_ckpt_enabled(mr)) {
1967  }
1968  if (mr->kmrviz_trace) {
1969  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_SORT, kte_start, kvi, kvo);
1970  }
1971 
1972  return MPI_SUCCESS;
1973 }
1974 
1975 /** Reorders key-value pairs in a single rank. It sorts pairs when
1976  SHUFFLING is false, or gathers pairs with the same hashed keys
1977  adjacent when SHUFFLING is true. It only respects for not
1978  ordering but just equality when shuffling. The sort-keys for
1979  shuffling are destination ranks for shuffling (taking a modulo of
1980  the hashed key with nprocs). As a sorting, it is NOT-STABLE due
1981  to quick-sort used inside. It converts pointer keys and values to
1982  opaque ones for sending.\n
1983  Sorting on a key-value stream is by memcmp(), unless the keys are
1984  integer or floating-point numbers (ordering on integers and
1985  memcmp() are different). Sorting on non-numbers is performed in
1986  two steps: the first step sorts by the integer rankings, and the
1987  second by the specified comparator. And thus, the comparator is
1988  required to have a corresponding generator of integer rankings.
1989  It consumes the input key-value stream. Effective-options:
1990  NOTHREADING, INSPECT, KEY_AS_RANK. */
1991 
1992 int
1993 kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling,
1994  struct kmr_option opt)
1995 {
1996  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
1997  assert(kmr_shuffle_compatible_p(kvo, kvi));
1998  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
1999  .key_as_rank = 1};
2000  kmr_check_fn_options(kvi->c.mr, kmr_supported, opt, __func__);
2001  _Bool ranking = opt.key_as_rank;
2002  kmr_sort_locally_lo(kvi, kvo, shuffling, ranking, opt);
2003  return MPI_SUCCESS;
2004 }
2005 
2006 /* ================================================================ */
2007 
2008 /* Counts the number of entries in the key-value stream. If
2009  BOUND_IN_BLOCK is true, it counts only ones in a single data
2010  block. */
2011 
2012 static inline long
2013 kmr_count_entries(KMR_KVS *kvs, _Bool bound_in_block)
2014 {
2015  kvs->c.current_block = kvs->c.first_block;
2016  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2017  long cnt = 0;
2018  while (e != 0) {
2019  /*struct kmr_kv_box kv = kmr_pick_kv(e, kvs);*/
2020  /*printf("entry %d %d %s %s\n", kv.klen, kv.vlen, kv.k.p, kv.v.p);*/
2021  cnt++;
2022  e = kmr_kvs_next(kvs, e, bound_in_block);
2023  }
2024  return cnt;
2025 }
2026 
2027 /** Shuffles key-value pairs to the appropriate destination ranks. It
2028  first sorts pairs by the destination ranks of the keys, and then
2029  exchanges pairs with all-to-all communication. It converts
2030  pointer keys and values to opaque ones for sending during the
2031  sorting stage. Note that the key-value pairs are sorted by the
2032  hash-values prior to exchange. Effective-options: INSPECT,
2033  KEY_AS_RANK, TAKE_CKPT. See struct kmr_option. */
2034 
2035 int
2036 kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
2037 {
2038  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2039  assert(kmr_shuffle_compatible_p(kvo, kvi));
2040  KMR *mr = kvi->c.mr;
2041  struct kmr_option kmr_supported = {.inspect = 1, .key_as_rank = 1,
2042  .take_ckpt = 1};
2043  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2044  _Bool ranking = opt.key_as_rank;
2045 
2046  kmr_trace_entry_t * kte_start = 0;
2047  if (mr->kmrviz_trace) {
2048  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_SHUFFLE, 0,
2049  kvi, kvo);
2050  }
2051 
2052  /* SKIP SHUFFLING IF MARKED AS SHUFFLED. */
2053 
2054  if (kvi->c.magic == KMR_KVS_PUSHOFF) {
2055  kmr_pushoff_make_stationary(kvi);
2056  }
2057  if (kvi->c.shuffled_in_pushoff) {
2058  assert(!mr->ckpt_enable);
2059  int cc = kmr_move_kvs(kvi, kvo, opt);
2060  return cc;
2061  }
2062 
2063  if (kmr_ckpt_enabled(mr)) {
2064  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2065  kmr_add_kv_done(kvo);
2066  if (!opt.inspect) {
2067  kmr_free_kvs(kvi);
2068  }
2069  return MPI_SUCCESS;
2070  }
2071  }
2072  int kcdc = kmr_ckpt_disable_ckpt(mr);
2073 
2074  /* Sort for shuffling. */
2075 
2076  enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
2077  enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
2078  struct kmr_option n_opt = opt;
2079  n_opt.inspect = 1;
2080  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
2081  kmr_sort_locally_lo(kvi, kvs1, 1, ranking, n_opt);
2082  assert(kvs1->c.stowed);
2083  /*kmr_dump_kvs(kvs1, 0);*/
2084  /*kmr_guess_communication_pattern_(kvs1, opt);*/
2085  assert(!kmr_fields_pointer_p(kvs1));
2086  assert(kvs1->c.block_count <= 1);
2087 
2088  int cc;
2089  int nprocs = mr->nprocs;
2090  long cnt = kvs1->c.element_count;
2091  long *ssz = kmr_malloc(sizeof(long) * (size_t)nprocs);
2092  long *sdp = kmr_malloc(sizeof(long) * (size_t)nprocs);
2093  long *rsz = kmr_malloc(sizeof(long) * (size_t)nprocs);
2094  long *rdp = kmr_malloc(sizeof(long) * (size_t)nprocs);
2095  for (int r = 0; r < nprocs; r++) {
2096  ssz[r] = 0;
2097  rsz[r] = 0;
2098  }
2099  int rank = 0;
2100  assert(kvs1->c.current_block == 0);
2101  kvs1->c.current_block = kvs1->c.first_block;
2102  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs1, kvs1->c.first_block);
2103  for (long i = 0; i < cnt; i++) {
2104  assert(e != 0);
2105  struct kmr_kv_box kv = kmr_pick_kv(e, kvs1);
2106  int r = (ranking ? (int)kv.k.i : kmr_pitch_rank(kv, kvs1));
2107  assert(0 <= r && r < nprocs);
2108  if (ranking && !(0 <= r && r < nprocs)) {
2109  kmr_error(mr, "key entries are not ranks");
2110  }
2111  if (r < rank) {
2112  kmr_error(mr, "key-value entries are not sorted (internal error)");
2113  }
2114  ssz[r] += (long)kmr_kvs_entry_netsize(e);
2115  rank = r;
2116  e = kmr_kvs_next(kvs1, e, 0);
2117  }
2118  /* Exchange send-receive counts. */
2119  cc = kmr_exchange_sizes(mr, ssz, rsz);
2120  assert(cc == MPI_SUCCESS);
2121  long sendsz = 0;
2122  long recvsz = 0;
2123  for (int r = 0; r < nprocs; r++) {
2124  sdp[r] = sendsz;
2125  sendsz += ssz[r];
2126  rdp[r] = recvsz;
2127  recvsz += rsz[r];
2128  }
2129  cc = kmr_allocate_block(kvo, (size_t)recvsz);
2130  assert(cc == MPI_SUCCESS);
2131  struct kmr_kvs_block *sb = kvs1->c.first_block;
2132  struct kmr_kvs_entry *sbuf = kmr_kvs_first_entry(kvs1, sb);
2133  struct kmr_kvs_block *rb = kvo->c.first_block;
2134  struct kmr_kvs_entry *rbuf = kmr_kvs_first_entry(kvo, rb);
2135  cc = kmr_alltoallv(mr, sbuf, ssz, sdp, rbuf, rsz, rdp);
2136  assert(cc == MPI_SUCCESS);
2137  long ocnt = kmr_count_entries(kvo, 1);
2138  assert(kvo->c.sorted == 0);
2139  kvo->c.element_count = ocnt;
2140  if (recvsz != 0) {
2141  assert(kvo->c.block_count == 1);
2142  rb->partial_element_count = ocnt;
2143  rb->fill_size = (size_t)recvsz;
2144  }
2145  kmr_kvs_adjust_adding_point(kvo);
2146  kmr_add_kv_done(kvo);
2147 
2148  kmr_ckpt_enable_ckpt(mr, kcdc);
2149  if (kmr_ckpt_enabled(mr)) {
2150  kmr_ckpt_save_kvo_whole(mr, kvo);
2151  }
2152 
2153  if (!opt.inspect) {
2154  kmr_free_kvs(kvi);
2155  kvi = 0;
2156  }
2157  assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
2158  xassert(!kmr_fields_pointer_p(kvo));
2159  kmr_free_kvs(kvs1);
2160  kmr_free(ssz, (sizeof(long) * (size_t)nprocs));
2161  kmr_free(sdp, (sizeof(long) * (size_t)nprocs));
2162  kmr_free(rsz, (sizeof(long) * (size_t)nprocs));
2163  kmr_free(rdp, (sizeof(long) * (size_t)nprocs));
2164  if (kmr_ckpt_enabled(mr)) {
2166  }
2167  if (mr->kmrviz_trace) {
2168  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_SHUFFLE, kte_start, kvi, kvo);
2169  }
2170 
2171  return MPI_SUCCESS;
2172 }
2173 
2174 /** Replicates key-value pairs to be visible on all ranks, that is, it
2175  has the effect of bcast or all-gather. It gathers pairs on rank0
2176  only by the option RANK_ZERO. It moves stably, keeping the
2177  ordering of ranks and the ordering of local key-value pairs.
2178  Effective-options: INSPECT, RANK_ZERO, TAKE_CKPT.
2179  See struct kmr_option. */
2180 
2181 int
2182 kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
2183 {
2184  kmr_assert_kvs_ok(kvi, kvo, 1, 1);
2185  KMR *mr = kvi->c.mr;
2186  struct kmr_option kmr_supported = {.inspect = 1, .rank_zero = 1,
2187  .take_ckpt = 1};
2188  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2189  int nprocs = mr->nprocs;
2190  int rank = mr->rank;
2191  int cc;
2192  KMR_KVS *kvs1;
2193 
2194  kmr_trace_entry_t * kte_start = 0;
2195  if (mr->kmrviz_trace) {
2196  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_REPLICATE, 0,
2197  kvi, kvo);
2198  }
2199 
2200  if (kmr_fields_pointer_p(kvi) || kvi->c.block_count > 1) {
2201  enum kmr_kv_field keyf = kvi->c.key_data;
2202  enum kmr_kv_field valf = kvi->c.value_data;
2203  kvs1 = kmr_create_kvs(mr, keyf, valf);
2204 
2205  int kcdc = kmr_ckpt_disable_ckpt(mr);
2206  cc = kmr_collapse_as_opaque(kvi, kvs1, 1);
2207  assert(cc == MPI_SUCCESS);
2208  kmr_ckpt_enable_ckpt(mr, kcdc);
2209  } else {
2210  kvs1 = kvi;
2211  }
2212  kmr_assert_on_tail_marker(kvs1);
2213  assert(kvs1->c.block_count <= 1);
2214 
2215  if (kmr_ckpt_enabled(mr)) {
2216  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2217  kmr_add_kv_done(kvo);
2218  if (kvs1 != kvi) {
2219  cc = kmr_free_kvs(kvs1);
2220  assert(cc == MPI_SUCCESS);
2221  }
2222  if (!opt.inspect) {
2223  cc = kmr_free_kvs(kvi);
2224  assert(cc == MPI_SUCCESS);
2225  }
2226  return MPI_SUCCESS;
2227  }
2228  }
2229 
2230  long *rsz = kmr_malloc(sizeof(long) * (size_t)nprocs);
2231  long *rdp = kmr_malloc(sizeof(long) * (size_t)nprocs);
2232  /* Exchange send-receive counts. */
2233  long ssz = (long)kvs1->c.storage_netsize;
2234  cc = kmr_gather_sizes(mr, ssz, rsz);
2235  assert(cc == MPI_SUCCESS);
2236  long recvsz = 0;
2237  if (!opt.rank_zero || rank == 0) {
2238  for (int r = 0; r < nprocs; r++) {
2239  rdp[r] = recvsz;
2240  recvsz += rsz[r];
2241  }
2242  }
2243  if (!(kvo->c.key_data == kvs1->c.key_data
2244  && kvo->c.value_data == kvs1->c.value_data)) {
2245  kmr_error(mr, "key-data or value-data types mismatch");
2246  }
2247  cc = kmr_allocate_block(kvo, (size_t)recvsz);
2248  assert(cc == MPI_SUCCESS);
2249  struct kmr_kvs_block *sb = kvs1->c.first_block;
2250  struct kmr_kvs_entry *sbuf = kmr_kvs_first_entry(kvs1, sb);
2251  struct kmr_kvs_block *rb = kvo->c.first_block;
2252  struct kmr_kvs_entry *rbuf = kmr_kvs_first_entry(kvo, rb);
2253  cc = kmr_allgatherv(mr, opt.rank_zero, sbuf, ssz, rbuf, rsz, rdp);
2254  assert(cc == MPI_SUCCESS);
2255  assert(kvo->c.element_count == 0);
2256  long ocnt = kmr_count_entries(kvo, 1);
2257  kvo->c.element_count = ocnt;
2258  if (recvsz != 0) {
2259  rb->partial_element_count = ocnt;
2260  rb->fill_size = (size_t)recvsz;
2261  }
2262  kmr_kvs_adjust_adding_point(kvo);
2263  cc = kmr_add_kv_done(kvo);
2264  assert(cc == MPI_SUCCESS);
2265  kmr_assert_on_tail_marker(kvo);
2266  assert(kvo->c.element_count == 0 || kvo->c.storage_netsize != 0);
2267 
2268  if (kmr_ckpt_enabled(mr)) {
2269  kmr_ckpt_save_kvo_whole(mr, kvo);
2270  }
2271 
2272  if (kvs1 != kvi) {
2273  cc = kmr_free_kvs(kvs1);
2274  assert(cc == MPI_SUCCESS);
2275  }
2276  if (!opt.inspect) {
2277  cc = kmr_free_kvs(kvi);
2278  assert(cc == MPI_SUCCESS);
2279  kvi = 0;
2280  }
2281  kmr_free(rsz, (sizeof(long) * (size_t)nprocs));
2282  kmr_free(rdp, (sizeof(long) * (size_t)nprocs));
2283 
2284  if (kmr_ckpt_enabled(mr)) {
2286  }
2287  if (mr->kmrviz_trace) {
2288  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_REPLICATE, kte_start,
2289  kvi, kvo);
2290  }
2291  return MPI_SUCCESS;
2292 }
2293 
2294 /* ================================================================ */
2295 
2296 /* Copies the entires as the keyed-records. The keyed-records hould
2297  hashed keys for sorting. */
2298 
2299 static int
2300 kmr_copy_record_fn(const struct kmr_kv_box kv,
2301  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
2302  const long i)
2303 {
2304  struct kmr_kv_box *ev = p;
2305  ev[i] = kv;
2306  return MPI_SUCCESS;
2307 }
2308 
2309 /* Reduces key-value pairs. The version "nothreading" uses less
2310  memory for sort-records than the version "threading". */
2311 
2312 static int
2313 kmr_reduce_nothreading(KMR_KVS *kvi, KMR_KVS *kvo,
2314  void *arg, struct kmr_option opt, kmr_redfn_t r)
2315 {
2316  xassert(kvi->c.current_block == 0);
2317  kmr_sorter_t cmp = kmr_choose_sorter(kvi);
2318  KMR *mr = kvi->c.mr;
2319  long cnt = kvi->c.element_count;
2320  int cc;
2321  struct kmr_kv_box *ev = 0;
2322  long evsz = 0;
2323  kvi->c.current_block = kvi->c.first_block;
2324  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvi, kvi->c.first_block);
2325  long index = 0;
2326  long redcount = 0;
2327 
2328  if (kmr_ckpt_enabled(mr)) {
2329  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2330  if (kvo != 0) {
2331  kmr_add_kv_done(kvo);
2332  }
2333  return MPI_SUCCESS;
2334  }
2335  long start_from = kmr_ckpt_first_unprocessed_kv(mr);
2336  while (index < start_from) {
2337  e = kmr_kvs_next(kvi, e, 1);
2338  index++;
2339  }
2341  }
2342 
2343  for (;;) {
2344  struct kmr_kvs_entry *ej = e;
2345  long n = 0;
2346  while (e != 0) {
2347  assert(index < cnt);
2348  e = kmr_kvs_next(kvi, e, 1);
2349  n++;
2350  index++;
2351  if (e == 0) {
2352  break;
2353  }
2354  //struct kmr_keyed_record s0 = {.v = 0, .e = kmr_pick_kv(ej, kvi)};
2355  //struct kmr_keyed_record s1 = {.v = 0, .e = kmr_pick_kv(e, kvi)};
2356  struct kmr_kv_box kv0 = kmr_pick_kv(ej, kvi);
2357  struct kmr_kv_box kv1 = kmr_pick_kv(e, kvi);
2358  cc = (*cmp)(&kv0, &kv1);
2359  if (cc != 0) {
2360  break;
2361  }
2362  }
2363  if (n == 0) {
2364  assert(ej == 0 && e == 0);
2365  break;
2366  }
2367  assert(n > 0);
2368  if (n > evsz) {
2369  evsz = n;
2370  ev = kmr_realloc(ev, (sizeof(struct kmr_kv_box) * (size_t)evsz));
2371  assert(ev != 0);
2372  }
2373  e = ej;
2374  for (long i = 0; i < n; i++) {
2375  assert(e != 0);
2376  ev[i] = kmr_pick_kv(e, kvi);
2377  e = kmr_kvs_next(kvi, e, 1);
2378  }
2379 
2380  double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2381  cc = (*r)(ev, n, kvi, kvo, arg);
2382  double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2383  if (cc != MPI_SUCCESS) {
2384  char ee[80];
2385  snprintf(ee, sizeof(ee),
2386  "Reduce-fn returned with error cc=%d", cc);
2387  kmr_error(mr, ee);
2388  }
2389  if (mr->log_traces != 0) {
2390  kmr_log_reduce(mr, kvi, ev, n, r, (t1 - t0));
2391  }
2392 
2393  if (kmr_ckpt_enabled(mr)) {
2394  kmr_ckpt_save_kvo_block_add(mr, kvo, n);
2395  }
2396 
2397  redcount += n;
2398  }
2399  assert(index == cnt);
2400  if (kmr_ckpt_enabled(mr)) {
2401  kmr_ckpt_save_kvo_block_fin(mr, kvo);
2402  }
2403  if (kvo != 0) {
2404  kmr_add_kv_done(kvo);
2405  }
2406  if (ev != 0) {
2407  kmr_free(ev, (sizeof(struct kmr_kv_box) * (size_t)evsz));
2408  }
2409  return MPI_SUCCESS;
2410 }
2411 
2412 static int
2413 kmr_reduce_threading(_Bool stop_when_some_added,
2414  KMR_KVS *kvi, KMR_KVS *kvo,
2415  void *arg, struct kmr_option opt, kmr_redfn_t r)
2416 {
2417  int cc;
2418  if (kmr_ckpt_enabled(kvi->c.mr)) {
2419  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2420  if (kvo != 0) {
2421  kmr_add_kv_done(kvo);
2422  }
2423  return MPI_SUCCESS;
2424  }
2425  }
2426  struct kmr_option inspect = {
2427  .inspect = 1,
2428  .nothreading = opt.nothreading
2429  };
2430  assert(kvi->c.current_block == 0);
2431  long cnt = kvi->c.element_count;
2432  struct kmr_kv_box *
2433  ev = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)cnt);
2434  int kcdc = kmr_ckpt_disable_ckpt(kvi->c.mr);
2435  cc = kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
2436  assert(cc == MPI_SUCCESS);
2437  kmr_ckpt_enable_ckpt(kvi->c.mr, kcdc);
2438 
2439  kmr_sorter_t cmp = kmr_choose_sorter(kvi);
2440  long *runs = kmr_malloc(sizeof(long) * (size_t)cnt);
2441  long nruns = 0;
2442  {
2443  long i = 0;
2444 #if 0
2445  if (kmr_ckpt_enabled(kvi->c.mr)) {
2446  i = kmr_ckpt_first_unprocessed_kv(kvi->c.mr);
2447  kmr_ckpt_save_kvo_block_init(kvi->c.mr, kvo);
2448  }
2449 #endif
2450  while (i < cnt) {
2451  do {
2452  i++;
2453  if (i == cnt) {
2454  break;
2455  }
2456  cc = (*cmp)(&ev[i - 1], &ev[i]);
2457  assert(cc <= 0);
2458  } while (cc == 0);
2459  assert(nruns < cnt);
2460  runs[nruns] = i;
2461  nruns++;
2462  }
2463  assert(i == cnt && (cnt == 0 || runs[nruns - 1] == cnt));
2464  }
2465  {
2466  if (kvo != 0) {
2467  kvo->c.under_threaded_operation = 1;
2468  }
2469  KMR *mr = kvi->c.mr;
2470  _Bool skip = 0;
2471  KMR_OMP_PARALLEL_FOR_
2472  for (long k = 0; k < nruns; k++) {
2473  /* (Access to stop is sloppy). */
2474  if (!skip) {
2475  long j = (k == 0 ? 0 : runs[k - 1]);
2476  long i = runs[k];
2477  assert(j < i);
2478  double t0 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2479  int ccx = (*r)(&ev[j], (i - j), kvi, kvo, arg);
2480  double t1 = ((mr->log_traces == 0) ? 0.0 : MPI_Wtime());
2481  if (ccx != MPI_SUCCESS) {
2482  char ee[80];
2483  snprintf(ee, sizeof(ee),
2484  "Reduce-fn returned with error cc=%d", ccx);
2485  kmr_error(mr, ee);
2486  }
2487  if (mr->log_traces != 0) {
2488  kmr_log_reduce(mr, kvi, ev, (i - j), r, (t1 - t0));
2489  }
2490 #if 0
2491  if (kmr_ckpt_enabled(mr)) {
2492  KMR_OMP_CRITICAL_
2493  {
2494  kmr_ckpt_save_kvo_block_add(mr, kvo, (i - j));
2495  }
2496  }
2497 #endif
2498  if (stop_when_some_added) {
2499  _Bool done;
2500  if (mr->stop_at_some_check_globally) {
2501  done = 0;
2502  } else {
2503  done = (kvo->c.element_count != 0);
2504  }
2505  if (done) {
2506  KMR_OMP_CRITICAL_
2507  {
2508  skip = 1;
2509  }
2510  }
2511  }
2512  }
2513  }
2514  if (kvo != 0) {
2515  kvo->c.under_threaded_operation = 0;
2516  }
2517  }
2518  if (kmr_ckpt_enabled(kvi->c.mr)) {
2519 #if 0
2520  kmr_ckpt_save_kvo_block_fin(mr, kvo);
2521 #endif
2522  kmr_ckpt_save_kvo_whole(kvi->c.mr, kvo);
2523  }
2524  if (kvo != 0) {
2525  kmr_add_kv_done(kvo);
2526  }
2527  kmr_free(runs, (sizeof(long) * (size_t)cnt));
2528  kmr_free(ev, (sizeof(struct kmr_kv_box) * (size_t)cnt));
2529  return MPI_SUCCESS;
2530 }
2531 
2532 /** Reduces key-value pairs. It does not include shuffling, and thus,
2533  it requires being preceded by shuffling. Or, it works on local
2534  data (as a local combiner), if it is not preceded by shuffling.
2535  It always consumes the input key-value stream KVI. An output
2536  key-value stream KVO can be null. It passes an array of key-value
2537  pairs to a reduce-function whose keys are all equal (equality is
2538  by bits). The pointer ARG is just passed to a reduce-function as
2539  a general argument, where accesses to it should be race-free,
2540  since a reduce-function is called by threads by default. R is a
2541  reduce-function. See the description on the type ::kmr_redfn_t.
2542  A reduce-function may see a different input key-value stream
2543  (internally created one) instead of the one given. During
2544  reduction, it first scans adjacent equal keys, then calls a given
2545  reduce-function. Effective-options: NOTHREADING, INSPECT, TAKE_CKPT.
2546  See struct kmr_option. */
2547 
2548 int
2549 kmr_reduce9(_Bool stop_when_some_added,
2550  KMR_KVS *kvi, KMR_KVS *kvo,
2551  void *arg, struct kmr_option opt, kmr_redfn_t r,
2552  const char *file, const int line, const char *func)
2553 {
2554  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2555  KMR *mr = kvi->c.mr;
2556  struct kmr_option kmr_supported = {.nothreading = 1, .inspect = 1,
2557  .take_ckpt = 1};
2558  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2559  struct kmr_option i_opt = kmr_copy_options_i_part(opt);
2560  struct kmr_option o_opt = kmr_copy_options_o_part(opt);
2561 
2562  kmr_trace_entry_t * kte_start = 0;
2563  if (mr->kmrviz_trace) {
2564  kte_start = kmr_trace_add_entry(mr, KMR_TRACE_EVENT_REDUCE, 0,
2565  kvi, kvo);
2566  }
2567  if (kmr_ckpt_enabled(mr)) {
2568  kmr_ckpt_lock_start(mr);
2569  }
2570 
2571  /* Sort for reduction. */
2572 
2573  enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvi->c.key_data);
2574  enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvi->c.value_data);
2575  KMR_KVS *kvs1 = kmr_create_kvs(mr, keyf, valf);
2576 
2577  /* Make checkpoint for kvi and kvs1. */
2578 
2579  kmr_sort_locally_lo(kvi, kvs1, 0, 0, i_opt);
2580  KMR_DEBUGX(kmr_assert_sorted(kvs1, 1, 0, 0));
2581 
2582  /* Make checkpoint for kvs1 and kvo. */
2583 
2584  struct kmr_code_line info;
2585  if (mr->atwork == 0) {
2586  info.file = file;
2587  info.func = func;
2588  info.line = line;
2589  mr->atwork = &info;
2590  }
2591  int cc;
2592  if (mr->single_thread || opt.nothreading) {
2593  cc = kmr_reduce_nothreading(kvs1, kvo, arg, o_opt, r);
2594  } else {
2595  cc = kmr_reduce_threading(stop_when_some_added,
2596  kvs1, kvo, arg, o_opt, r);
2597  }
2598  if (mr->atwork == &info) {
2599  mr->atwork = 0;
2600  }
2601 
2602  //kmr_assert_on_tail_marker(kvo);
2603  kmr_assert_on_tail_marker(kvs1);
2604  kmr_free_kvs(kvs1);
2605 
2606  if (kmr_ckpt_enabled(mr)) {
2609  }
2610  if (mr->kmrviz_trace) {
2611  if (!opt.inspect) {
2612  kvi = 0;
2613  }
2614  kmr_trace_add_entry(mr, KMR_TRACE_EVENT_REDUCE, kte_start, kvi, kvo);
2615  }
2616 
2617  return cc;
2618 }
2619 
2620 /** Calls a reduce-function once as if all key-value pairs had the
2621  same key. See kmr_reduce(). Effective-options: INSPECT, TAKE_CKPT.
2622  See struct kmr_option. */
2623 
2624 int
2626  void *arg, struct kmr_option opt, kmr_redfn_t r)
2627 {
2628  kmr_assert_kvs_ok(kvi, kvo, 1, 0);
2629  KMR *mr = kvi->c.mr;
2630  assert(kvi->c.current_block == 0);
2631  struct kmr_option kmr_supported = {.inspect = 1, .take_ckpt = 1};
2632  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
2633 
2634  if (kmr_ckpt_enabled(mr)) {
2635  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
2636  if (kvo != 0) {
2637  kmr_add_kv_done(kvo);
2638  }
2639  if (!opt.inspect) {
2640  kmr_free_kvs(kvi);
2641  }
2642  return MPI_SUCCESS;
2643  }
2644  }
2645  int kcdc = kmr_ckpt_disable_ckpt(mr);
2646 
2647  long cnt = kvi->c.element_count;
2648  struct kmr_kv_box *
2649  ev = kmr_malloc(sizeof(struct kmr_kv_box) * (size_t)cnt);
2650 
2651  if (cnt > 0) {
2652  int cc;
2653 
2654  struct kmr_option inspect = {.inspect = 1};
2655  cc = kmr_map(kvi, 0, ev, inspect, kmr_copy_record_fn);
2656  assert(cc == MPI_SUCCESS);
2657 
2658  cc = (*r)(&ev[0], cnt, kvi, kvo, arg);
2659  if (cc != MPI_SUCCESS) {
2660  char ee[80];
2661  snprintf(ee, sizeof(ee),
2662  "Reduce-fn returned with error cc=%d", cc);
2663  kmr_error(mr, ee);
2664  }
2665  }
2666 
2667  kmr_ckpt_enable_ckpt(mr, kcdc);
2668  if (kmr_ckpt_enabled(mr)) {
2669  kmr_ckpt_save_kvo_whole(mr, kvo);
2670  }
2671 
2672  if (kvo != 0) {
2673  kmr_add_kv_done(kvo);
2674  }
2675  kmr_free(ev, (sizeof(struct kmr_kv_box) * (size_t)cnt));
2676  if (!opt.inspect) {
2677  kmr_free_kvs(kvi);
2678  }
2679 
2680  if (kmr_ckpt_enabled(mr)) {
2682  }
2683  return MPI_SUCCESS;
2684 }
2685 
2686 /* ================================================================ */
2687 
2688 /** Concatenates a number of KVSes to one. Inputs are consumed. (It
2689  is fast because the key-value data is stored internally as a list
2690  of data blocks, and this routine just links them). Note that
2691  concatenating KVS can in effect be performed by consecutive calls
2692  to kmr_map() with the KEEP_OPEN option using the same output KVS.
2693  Effective-options: none. */
2694 
2695 int
2696 kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo,
2697  struct kmr_option opt)
2698 {
2699  for (int i = 0; i < nkvs; i++) {
2700  kmr_assert_i_kvs_ok(kvs[i], 1);
2701  }
2702  kmr_assert_o_kvs_ok(kvo, 1);
2703  if (kvo->c.element_count > 0) {
2704  KMR *mr = kvo->c.mr;
2705  kmr_error(mr, "kmr_concatenate_kvs: Output kvs has entries");
2706  }
2707  kmr_check_fn_options(kvo->c.mr, kmr_noopt, opt, __func__);
2708 
2709  struct kmr_kvs_block *storage = 0;
2710  long elements = 0;
2711  size_t netsize = 0;
2712  long blocks = 0;
2713 
2714  struct kmr_kvs_block *p = 0;
2715  for (int i = 0; i < nkvs; i++) {
2716  elements += kvs[i]->c.element_count;
2717  netsize += kvs[i]->c.storage_netsize;
2718  blocks += kvs[i]->c.block_count;
2719 
2720  struct kmr_kvs_block *bb = kvs[i]->c.first_block;
2721  if (bb != 0) {
2722  kvs[i]->c.first_block = 0;
2723  if (p == 0) {
2724  assert(storage == 0);
2725  p = bb;
2726  storage = bb;
2727  } else {
2728  assert(blocks != 0 && p->next == 0);
2729  p->next = bb;
2730  }
2731  }
2732  if (p != 0) {
2733  while (p->next != 0) {
2734  p = p->next;
2735  }
2736  }
2737  kmr_free_kvs(kvs[i]);
2738  }
2739 
2740  kvo->c.first_block = storage;
2741  kvo->c.element_count = elements;
2742  kvo->c.storage_netsize = netsize;
2743  kvo->c.block_count = blocks;
2744 
2745  /*kmr_add_kv_done(kvo);*/
2746  kvo->c.stowed = 1;
2747  kvo->c.current_block = 0;
2748  kvo->c.adding_point = 0;
2749  assert(kvo->c.block_count == 0 || kvo->c.first_block != 0);
2750 
2751  return MPI_SUCCESS;
2752 }
2753 
2754 /** Finds the last entry of a key-value stream. It returns null when
2755  a key-value stream is empty. It sequentially scans all the
2756  entries and slow. */
2757 
2758 struct kmr_kvs_entry *
2760 {
2761  kmr_assert_kvs_ok(kvs, 0, 1, 0);
2762  assert(kvs->c.magic == KMR_KVS_ONCORE);
2763 #if 0
2764  long cnt = kvs->c.element_count;
2765  kvs->c.current_block = kvs->c.first_block;
2766  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2767  struct kmr_kvs_entry *o = 0;
2768  for (long i = 0; i < cnt && e != 0; i++) {
2769  o = e;
2770  e = kmr_kvs_next(kvs, e, 0);
2771  }
2772  kvs->c.current_block = 0;
2773  return o;
2774 #else
2775  if (kvs->c.element_count == 0) {
2776  return 0;
2777  } else {
2778  struct kmr_kvs_block *b;
2779  for (b = kvs->c.first_block; b->next != 0; b = b->next);
2780  kvs->c.current_block = b;
2781  struct kmr_kvs_entry *e;
2782  struct kmr_kvs_entry *o;
2783  e = kmr_kvs_first_entry(kvs, b);
2784  o = 0;
2785  long cnt = b->partial_element_count;
2786  for (long i = 0; i < cnt && e != 0; i++) {
2787  o = e;
2788  e = kmr_kvs_next(kvs, e, 1);
2789  }
2790  kvs->c.current_block = 0;
2791  return o;
2792  }
2793 #endif
2794 }
2795 
2796 /** Fills local key-value entries in an array for inspection. The
2797  returned pointers point to the inside of the KVS. The array EV
2798  should be as large as N. It implies inspect. */
2799 
2800 int
2802 {
2803  kmr_assert_kvs_ok(kvs, 0, 1, 0);
2804  assert(kvs->c.magic == KMR_KVS_ONCORE);
2805  long cnt = MIN(n, kvs->c.element_count);
2806  kvs->c.current_block = kvs->c.first_block;
2807  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2808  for (long i = 0; i < cnt && e != 0; i++) {
2809  ev[i] = e;
2810  e = kmr_kvs_next(kvs, e, 0);
2811  }
2812  kvs->c.current_block = 0;
2813  return MPI_SUCCESS;
2814 }
2815 
2816 /** Fills keyed records in an array for sorting. The array EV should
2817  be as large as N. It implies inspect. */
2818 
2819 int
2821  long n, _Bool shuffling, _Bool ranking)
2822 {
2823  kmr_assert_kvs_ok(kvs, 0, 1, 0);
2824  assert(kvs->c.magic == KMR_KVS_ONCORE);
2825  long cnt = MIN(n, kvs->c.element_count);
2826  kvs->c.current_block = kvs->c.first_block;
2827  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, kvs->c.first_block);
2828  for (long i = 0; i < cnt && e != 0; i++) {
2829  struct kmr_kv_box kv = kmr_pick_kv(e, kvs);
2830  if (shuffling) {
2831  ev[i].v = (ranking ? kv.k.i : kmr_pitch_rank(kv, kvs));
2832  ev[i].e = e;
2833  } else {
2834  ev[i].v = kmr_stable_key(kv, kvs);
2835  ev[i].e = e;
2836  }
2837  e = kmr_kvs_next(kvs, e, 0);
2838  }
2839  kvs->c.current_block = 0;
2840  return MPI_SUCCESS;
2841 }
2842 
2843 /** Returns a minimum byte size of the field: 8 for INTEGER and
2844  FLOAT8, 0 for others. */
2845 
2846 int
2848 {
2849  switch (f) {
2850  case KMR_KV_BAD:
2851  kmr_error(mr, "kmr_legal_minimum_field_size: Bad field");
2852  return 0;
2853  case KMR_KV_INTEGER:
2854  return sizeof(long);
2855  case KMR_KV_FLOAT8:
2856  return sizeof(double);
2857  case KMR_KV_OPAQUE:
2858  case KMR_KV_CSTRING:
2859  case KMR_KV_POINTER_OWNED:
2860  case KMR_KV_POINTER_UNMANAGED:
2861  return 0;
2862  default:
2863  kmr_error(mr, "kmr_legal_minimum_field_size: Bad field");
2864  return 0;
2865  }
2866 }
2867 
2868 /** Scans every key-value with a reduce-function locally
2869  (independently on each rank). It works in the order in the KVS.
2870  It ignores differences of the keys. It gets the start value from
2871  CARRYIN and puts the final value to CARRYOUT. The output has the
2872  same number of entries as the input. The carry-in and carry-out
2873  have one entry. The carry-out can be null. The reduce-function
2874  is called on each key-value pair as the right operand with the
2875  previous value as the left operand, and it should output a single
2876  value. The key part of the output is ignored and a pair is stored
2877  under the original key. */
2878 
2879 int
2881  KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
2882 {
2883  int cc;
2884  KMR *mr = kvo->c.mr;
2885  enum kmr_kv_field keyf = kvi->c.key_data;
2886  enum kmr_kv_field valf = kvi->c.value_data;
2887 
2888  long cnt = kvi->c.element_count;
2889  size_t evsz = (sizeof(struct kmr_keyed_record) * (size_t)cnt);
2890  struct kmr_keyed_record *ev = kmr_malloc(evsz);
2891  cc = kmr_retrieve_keyed_records(kvi, ev, cnt, 0, 0);
2892  assert(cc == MPI_SUCCESS);
2893 
2894  KMR_KVS *lastvalue = carryin;
2895  for (long i = 0; i < cnt; i++) {
2896  struct kmr_kv_box kv;
2897  cc = kmr_take_one(lastvalue, &kv);
2898  assert(cc == MPI_SUCCESS);
2899  struct kmr_kv_box bx[2];
2900  bx[0] = kv;
2901  bx[1] = kmr_pick_kv(ev[i].e, kvi);
2902  KMR_KVS *xs = kmr_create_kvs(mr, keyf, valf);
2903  cc = (*r)(bx, 2, kvi, xs, 0);
2904  if (cc != MPI_SUCCESS) {
2905  char ee[80];
2906  snprintf(ee, sizeof(ee),
2907  "Reduce-fn returned with error cc=%d", cc);
2908  kmr_error(mr, ee);
2909  }
2910  cc = kmr_add_kv_done(xs);
2911  assert(cc == MPI_SUCCESS);
2912  /* Put the last value as it is a non-inclusive scan. */
2913  bx[0].klen = bx[1].klen;
2914  bx[0].k = bx[1].k;
2915  cc = kmr_add_kv(kvo, bx[0]);
2916  assert(cc == MPI_SUCCESS);
2917  kmr_free_kvs(lastvalue);
2918  lastvalue = xs;
2919  }
2920  cc = kmr_add_kv_done(kvo);
2921  assert(cc == MPI_SUCCESS);
2922 
2923  if (carryout != 0) {
2924  struct kmr_kv_box kv;
2925  cc = kmr_take_one(lastvalue, &kv);
2926  assert(cc == MPI_SUCCESS);
2927  cc = kmr_add_kv(carryout, kv);
2928  assert(cc == MPI_SUCCESS);
2929  cc = kmr_add_kv_done(carryout);
2930  assert(cc == MPI_SUCCESS);
2931  }
2932  kmr_free_kvs(lastvalue);
2933  kmr_free_kvs(kvi);
2934 
2935  if (ev != 0) {
2936  kmr_free(ev, evsz);
2937  }
2938 
2939  return MPI_SUCCESS;
2940 }
2941 
2942 /*
2943 Copyright (C) 2012-2016 RIKEN AICS
2944 This library is distributed WITHOUT ANY WARRANTY. This library can be
2945 redistributed and/or modified under the terms of the BSD 2-Clause License.
2946 */
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Definition: kmrckpt.c:2753
Key-Value Stream (abstract).
Definition: kmr.h:587
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
Definition: kmrbase.c:779
int kmr_save_kvs(KMR_KVS *kvs, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
Definition: kmrbase.c:968
kmr_trace_entry_t * kmr_trace_add_entry(KMR *mr, kmr_trace_event_t ev, kmr_trace_entry_t *pre, KMR_KVS *kvi, KMR_KVS *kvo)
Add an entry to the trace.
Definition: kmrtrace.c:174
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *identifying_name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147
Utilities Private Part (do not include from applications).
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
Definition: kmrbase.c:937
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
Definition: kmrckpt.c:2845
static KMR_KVS * kmr_create_raw_kvs(KMR *mr, const KMR_KVS *_similar)
Makes a new key-value stream (type KMR_KVS).
Definition: kmrbase.c:418
void kmr_ckpt_restore_ckpt(KMR_KVS *)
It restores checkpoint data to kvs.
Definition: kmrckpt.c:2557
int kmr_map_skipping(long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps by skipping the number of entries.
Definition: kmrbase.c:1134
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
Definition: kmrbase.c:913
static int kmr_icmp(const void *a0, const void *a1)
Compares the key field of keyed-records for qsort/bsearch.
Definition: kmrbase.c:1815
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
Definition: kmrbase.c:1993
Key-Value Stream.
Definition: kmr.h:458
int kmr_map9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *file, const int line, const char *func)
Maps simply.
Definition: kmrbase.c:1289
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
Definition: kmrckpt.c:2478
static const size_t kmr_kvs_entry_header
Size of an Entry Header.
Definition: kmr.h:382
Keyed-Record for Sorting.
Definition: kmr.h:372
int kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep)
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value p...
Definition: kmrbase.c:843
long kmr_ckpt_first_unprocessed_kv(KMR *)
It returns the index of the first unprocessed key-value in the input KVS.
Definition: kmrckpt.c:2535
int kmr_move_kvs(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Moves the contents of the input KVI to the output KVO.
Definition: kmrbase.c:534
Definition: kmr.h:348
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging.
Definition: kmrbase.c:1339
KMR Context.
Definition: kmr.h:222
struct kmr_kvs_entry * kmr_find_kvs_last_entry(KMR_KVS *kvs)
Finds the last entry of a key-value stream.
Definition: kmrbase.c:2759
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file.
Definition: kmrckpt.c:2638
Definition: kmrtrace.h:27
int kmr_allgatherv(KMR *mr, _Bool rankzeroonly, void *sbuf, long scnt, void *rbuf, long *rcnts, long *rdsps)
All-gathers data, or gathers data when RANKZEROONLY.
Definition: kmratoa.c:70
void kmr_ckpt_save_kvo_block_add(KMR *, KMR_KVS *, long)
It adds a new block of key-value pairs of the output KVS to the checkpoint data file.
Definition: kmrckpt.c:2670
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1456
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:325
void kmr_isort(void *a, size_t n, size_t es, int depth)
Sorts by comparator on long integers.
Definition: kmrisort.c:292
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
Definition: kmrbase.c:1369
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
Definition: kmrbase.c:2696
void kmr_ckpt_create_context(KMR *)
Initialize checkpoint context.
Definition: kmrckpt.c:119
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_reduce9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *file, const int line, const char *func)
Reduces key-value pairs.
Definition: kmrbase.c:2549
Options to Mapping by Spawns.
Definition: kmr.h:662
void kmr_ckpt_save_kvo_block_fin(KMR *, KMR_KVS *)
It finalizes saving block of key-value pairs of the output KVS to the checkpoint data file...
Definition: kmrckpt.c:2685
State during kmr_map_ms().
Definition: kmr.h:408
void kmr_ckpt_free_context(KMR *)
Free checkpoint context.
Definition: kmrckpt.c:162
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
Definition: kmrckpt.c:2494
int kmr_add_kv_pushoff(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmraltkvs.c:460
void kmr_trace_initialize(KMR *mr)
Initialize a trace.
Definition: kmrtrace.c:153
int kmr_assert_sorted(KMR_KVS *kvi, _Bool locally, _Bool shuffling, _Bool ranking)
Checks a key-value stream is sorted.
Definition: kmrutil.c:702
void kmr_trace_finalize(KMR *mr)
Finalize a trace.
Definition: kmrtrace.c:162
void kmr_ckpt_save_kvo_block_init(KMR *, KMR_KVS *)
It initializes saving blocks of key-value pairs of the output KVS to a checkpoint data file...
Definition: kmrckpt.c:2654
int kmr_retrieve_keyed_records(KMR_KVS *kvs, struct kmr_keyed_record *ev, long n, _Bool shuffling, _Bool ranking)
Fills keyed records in an array for sorting.
Definition: kmrbase.c:2820
#define xassert(X)
Asserts and aborts, but it cannot be disabled.
Definition: kmrdp.cpp:51
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:181
int kmr_scan_locally(KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
Scans every key-value with a reduce-function locally (independently on each rank).
Definition: kmrbase.c:2880
int kmr_retrieve_kvs_entries(KMR_KVS *kvs, struct kmr_kvs_entry **ev, long n)
Fills local key-value entries in an array for inspection.
Definition: kmrbase.c:2801
KMR Interface.
int kmr_add_kv_done_pushoff(KMR_KVS *kvs)
Marks finished adding key-value pairs, called from kmr_add_kv_done().
Definition: kmraltkvs.c:555
Options to Mapping on Files.
Definition: kmr.h:638
Unit-Sized Storage.
Definition: kmr.h:340
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
Definition: kmrimpl.h:551
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
Definition: kmr.h:700
int kmr_gather_sizes(KMR *mr, long siz, long *rbuf)
Calls all-gather for collecting one long-integer.
Definition: kmratoa.c:58
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1402
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
Definition: kmrckpt.c:2515
N-Tuple Argument.
Definition: kmr.h:740
static void kmr_poke_kv(struct kmr_kvs_entry *e, const struct kmr_kv_box kv, struct kmr_kv_box *xkv, const KMR_KVS *kvs, _Bool reserve_space_only)
Stores a key-value pair at the entry E in the store – a reverse of kmr_pick_kv().
Definition: kmrimpl.h:599
Information of Source Code Line.
Definition: kmr.h:107
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
Definition: kmr.h:689
void kmr_ckpt_lock_start(KMR *)
Define the start position of code region that is referred when restart.
Definition: kmrckpt.c:1934
int kmr_copy_mpi_info(MPI_Info src, MPI_Info dst)
Copies contents of MPI_Info.
Definition: kmrutil.c:2238
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *file, const int line, const char *func)
Makes a new key-value stream with the specified field data-types.
Definition: kmrbase.c:510
int kmr_alltoallv(KMR *mr, void *sbuf, long *scounts, long *sdsps, void *rbuf, long *rcounts, long *rdsps)
Does all-to-all-v, but it takes arguments of long-integers.
Definition: kmratoa.c:116
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz_, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
Definition: kmrbase.c:1034
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
Definition: kmrbase.c:2625
void kmr_ckpt_remove_ckpt(KMR_KVS *)
It removes checkpoint data file.
Definition: kmrckpt.c:2612
void kmr_ckpt_lock_finish(KMR *)
Define the end position of code region that is referred when restart.
Definition: kmrckpt.c:1945
KMRViz tracing Support.
int kmr_exchange_sizes(KMR *mr, long *sbuf, long *rbuf)
Calls all-to-all to exchange one long-integer.
Definition: kmratoa.c:46
int kmr_legal_minimum_field_size(KMR *mr, enum kmr_kv_field f)
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. ...
Definition: kmrbase.c:2847