KMR
kmrckpt.c
Go to the documentation of this file.
1 /* kmrckpt.c (2014-04-01) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrckpt.c Checkpoint/Restart Support. */
5 
6 #include <mpi.h>
7 #include <assert.h>
8 #include <stddef.h>
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <unistd.h>
12 #include <dirent.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <string.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <limits.h>
19 
20 #ifdef _OPENMP
21 #include <omp.h>
22 #endif
23 #include "../config.h"
24 #include "kmr.h"
25 #include "kmrimpl.h"
26 #include "kmrckpt.h"
27 
28 /* Functions for initialization */
29 static void kmr_ckpt_init_environment(KMR *);
30 static int kmr_ckpt_check_restart(KMR *, int **, int *, int *);
31 static void kmr_ckpt_restore_prev_progress(KMR *, int *, int);
32 static void kmr_ckpt_restore_prev_state(KMR *, const char *, int*, int, int);
33 static void kmr_ckpt_restore_prev_state_each_rank(KMR *,
34  struct kmr_ckpt_prev_state *,
35  struct kmr_ckpt_merge_ctx *);
36 static int kmr_ckpt_merge_check_ignorable(struct kmr_ckpt_kvs_chains *, long);
37 static void kmr_ckpt_merge_ignore_ckpt_data(long,
38  struct kmr_ckpt_prev_state *,
39  struct kmr_ckpt_merge_ctx *);
40 static void kmr_ckpt_merge_store_ckpt_data(long, int, long,
41  struct kmr_ckpt_prev_state *,
42  struct kmr_ckpt_merge_ctx *);
43 static void kmr_ckpt_merge_update_ckpt_data(long, int, long, long,
44  struct kmr_ckpt_list *,
45  struct kmr_ckpt_prev_state *,
46  struct kmr_ckpt_merge_ctx *);
47 static void kmr_ckpt_merge_sort_data(KMR *, const char *, long,
48  struct kmr_ckpt_merge_source *);
49 static void kmr_ckpt_merge_write_file(KMR *, const char *,
50  struct kmr_ckpt_merge *);
51 /* Functions for logging */
52 static void kmr_ckpt_init_log(KMR *, const char *);
53 static void kmr_ckpt_fin_log(KMR *);
54 static FILE *kmr_ckpt_open_log(KMR *, const char *, struct kmr_ckpt_log *,
55  unsigned long *);
56 static void kmr_ckpt_log_whole_start(KMR *);
57 static void kmr_ckpt_log_whole_finish(KMR *);
58 static void kmr_ckpt_log_block_start(KMR *, KMR_KVS *);
59 static void kmr_ckpt_log_block_add(KMR *, long, long);
60 static void kmr_ckpt_log_block_finish(KMR *);
61 static void kmr_ckpt_log_index_start(KMR *, KMR_KVS *);
62 static void kmr_ckpt_log_index_add(KMR *, long, long);
63 static void kmr_ckpt_log_index_finish(KMR *);
64 static void kmr_ckpt_log_delete_start(KMR *, long);
65 static void kmr_ckpt_log_delete_finish(KMR *, long);
66 static void kmr_ckpt_log_deletable(KMR *, long );
67 static void kmr_ckpt_log_progress(KMR *);
68 static void kmr_ckpt_log_skipped(KMR *);
69 /* Functions for checkpoint data management */
70 static void kmr_ckpt_delete_ckpt_data(KMR *, long);
71 static void kmr_ckpt_delete_ckpt_files(KMR *, const char *, int);
72 static void kmr_ckpt_save_ckpt(KMR_KVS *);
73 static void kmr_ckpt_kv_record_init(KMR *, KMR_KVS *);
74 static long kmr_ckpt_kv_record_add(KMR_KVS *);
75 static void kmr_ckpt_kv_record_fin(KMR *);
76 static FILE *kmr_ckpt_open(KMR_KVS *, const char *);
77 static FILE *kmr_ckpt_open_path(KMR *, const char *, const char *);
78 static void kmr_ckpt_save_nprocs(KMR *, const char *);
79 static void kmr_ckpt_make_fname(const char *, const char *,
80  enum kmr_ckpt_type, int, long, char *, size_t);
81 static void kmr_ckpt_get_data_flist(KMR *, const char *,
82  struct kmr_ckpt_data_file **, int *,
83  _Bool);
84 static void kmr_ckpt_flush(KMR *, FILE *);
85 /* Utility functions */
86 static void kmr_ckpt_list_init(struct kmr_ckpt_list *, kmr_ckpt_list_alocfn_t,
87  kmr_ckpt_list_freefn_t, kmr_ckpt_list_compfn_t);
88 static void kmr_ckpt_list_free(struct kmr_ckpt_list *);
89 static void kmr_ckpt_list_add(struct kmr_ckpt_list *, void *);
90 static void *kmr_ckpt_list_del(struct kmr_ckpt_list *, void *);
91 static void *kmr_ckpt_list_search(struct kmr_ckpt_list *, void *);
92 static void *kmr_ckpt_list_rsearch(struct kmr_ckpt_list *, void *);
93 static void kmr_ckpt_int_list_init(struct kmr_ckpt_list *);
94 static void kmr_ckpt_int_list_free(struct kmr_ckpt_list *);
95 static void kmr_ckpt_int_list_add(struct kmr_ckpt_list *, long);
96 static long kmr_ckpt_int_list_del(struct kmr_ckpt_list *, long);
97 static long kmr_ckpt_int_list_search(struct kmr_ckpt_list *, long);
98 static long kmr_ckpt_int_list_rsearch(struct kmr_ckpt_list *, long);
99 static void kmr_ckpt_opr_list_init(struct kmr_ckpt_list *);
100 static void kmr_ckpt_opr_list_free(struct kmr_ckpt_list *);
101 static void kmr_ckpt_opr_list_add(struct kmr_ckpt_list *,
102  struct kmr_ckpt_operation);
103 static void kmr_ckpt_kvs_chains_init(struct kmr_ckpt_kvs_chains *);
104 static void kmr_ckpt_kvs_chains_free(struct kmr_ckpt_kvs_chains *);
105 static void kmr_ckpt_kvs_chains_new_chain(struct kmr_ckpt_kvs_chains *,
106  struct kmr_ckpt_operation);
107 static void kmr_ckpt_kvs_chains_connect(struct kmr_ckpt_kvs_chains *,
108  struct kmr_ckpt_operation);
109 static struct kmr_ckpt_list *
110 kmr_ckpt_kvs_chains_find(struct kmr_ckpt_kvs_chains *, long);
111 
112 
113 /** Initialize checkpoint context. This function should be called only once
114  when MapReduce data type is initialized.
115 
116  \param[in] mr MapReduce data type
117 */
118 void
120 {
121  struct kmr_ckpt_ctx *
122  ckptctx = kmr_malloc(sizeof(struct kmr_ckpt_ctx));
123  mr->ckpt_ctx = ckptctx;
124  snprintf(ckptctx->ckpt_dname, KMR_CKPT_DIRLEN, "./%s%05d",
125  KMR_CKPT_DIRNAME, mr->rank);
126  ckptctx->prev_mode = KMR_CKPT_ALL;
127  ckptctx->ckpt_log_fp = NULL;
128  ckptctx->progress_counter = 0;
129  ckptctx->prev_progress = 0;
130  ckptctx->prev_global_progress = 0;
131  ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
132  ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
133  ckptctx->ckpt_data_fp = NULL;
134  ckptctx->saved_element_count = 0;
135  ckptctx->saved_adding_point = NULL;
136  ckptctx->saved_current_block = NULL;
137  ckptctx->kv_positions = NULL;
138  ckptctx->kv_positions_count = 0;
139  ckptctx->lock_id = 0;
140  ckptctx->lock_counter = 0;
141  ckptctx->initialized = 0;
142  ckptctx->slct_cur_take_ckpt = 0;
143  if (mr->ckpt_selective) {
144  ckptctx->slct_skip_ops = (struct kmr_ckpt_list *)
145  kmr_malloc(sizeof(struct kmr_ckpt_list));
146  kmr_ckpt_int_list_init(ckptctx->slct_skip_ops);
147  } else {
148  ckptctx->slct_skip_ops = NULL;
149  }
150 
151  if (mr->ckpt_enable) {
152  kmr_ckpt_init_environment(mr);
153  }
154 }
155 
156 /** Free checkpoint context. This function should be called only once
157  when MapReduce data type is freed.
158 
159  \param[in] mr MapReduce data type
160 */
161 void
163 {
164  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
165  if (kmr_ckpt_enabled(mr)) {
166  MPI_Barrier(mr->comm);
167  kmr_ckpt_fin_log(mr);
168  kmr_ckpt_delete_ckpt_files(mr, ckptctx->ckpt_dname, mr->rank);
169  kmr_free(ckptctx->kv_positions,
170  sizeof(struct kv_position) * (size_t)ckptctx->kv_positions_count);
171  }
172  if (mr->ckpt_selective) {
173  kmr_ckpt_int_list_free(ckptctx->slct_skip_ops);
174  kmr_free(ckptctx->slct_skip_ops, sizeof(struct kmr_ckpt_list));
175  }
176  kmr_free(ckptctx, sizeof(struct kmr_ckpt_ctx));
177  mr->ckpt_ctx = 0;
178 }
179 
180 /***************************************************************/
181 /* Functions for initilizing checkpoint/restart environment */
182 /***************************************************************/
183 
184 /* Initialize checkpoint environment */
185 static void
186 kmr_ckpt_init_environment(KMR *mr)
187 {
188  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
189  if (ckptctx->initialized) {
190  return;
191  }
192 
193  /* Check if restarted or not */
194  int *prev_ranks = NULL;
195  int prev_rank_count = 0;
196  int prev_nprocs = 0;
197  int restarted = kmr_ckpt_check_restart(mr, &prev_ranks, &prev_rank_count,
198  &prev_nprocs);
199  {
200  int all_restarted;
201  int cc = MPI_Allreduce(&restarted, &all_restarted, 1, MPI_INT,
202  MPI_LAND, mr->comm);
203  assert(cc == MPI_SUCCESS);
204  assert(restarted == all_restarted);
205  }
206 
207  /* Create a temporal directory for checkpoint files */
208  char tmp_dname[KMR_CKPT_DIRLEN];
209  snprintf(tmp_dname, KMR_CKPT_DIRLEN, "./tmp_%s%05d",
210  KMR_CKPT_DIRNAME, mr->rank);
211  kmr_ckpt_delete_ckpt_files(mr, tmp_dname, mr->rank);
212  int cc = mkdir(tmp_dname, S_IRWXU);
213  if (cc != 0) {
214  char msg[KMR_CKPT_MSGLEN];
215  snprintf(msg, sizeof(msg),
216  "Failed to create a directory for checkpoint %s", tmp_dname);
217  kmr_error(mr, msg);
218  }
219 
220  /* Load checkpoint files to restart */
221  if (restarted) {
222  kmr_ckpt_restore_prev_progress(mr, prev_ranks, prev_rank_count);
223  kmr_ckpt_restore_prev_state(mr, tmp_dname, prev_ranks, prev_rank_count,
224  prev_nprocs);
225  }
226 
227  /* Initialize a log file */
228  char log_fpath[KMR_CKPT_PATHLEN];
229  kmr_ckpt_make_fname(tmp_dname, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
230  mr->rank, 0, log_fpath, sizeof(log_fpath));
231  kmr_ckpt_init_log(mr, log_fpath);
232 
233  /* save nprocs to file */
234  if (mr->rank == 0) {
235  kmr_ckpt_save_nprocs(mr, tmp_dname);
236  }
237 
238  /* Rename directories */
239  for (int i = 0; i < prev_rank_count; i++) {
240  char old_dname[KMR_CKPT_DIRLEN];
241  snprintf(old_dname, KMR_CKPT_DIRLEN, "./%s%05d.old",
242  KMR_CKPT_DIRNAME, prev_ranks[i]);
243  kmr_ckpt_delete_ckpt_files(mr, old_dname, prev_ranks[i]);
244  char cur_dname[KMR_CKPT_DIRLEN];
245  snprintf(cur_dname, KMR_CKPT_DIRLEN, "./%s%05d",
246  KMR_CKPT_DIRNAME, prev_ranks[i]);
247  struct stat sb;
248  cc = stat(cur_dname, &sb);
249  if (cc == 0) {
250  cc = rename(cur_dname, old_dname);
251  assert(cc == 0);
252  }
253  }
254  MPI_Barrier(mr->comm);
255  cc = rename(tmp_dname, ckptctx->ckpt_dname);
256  assert(cc == 0);
257 
258  if (restarted) {
259  kmr_free(prev_ranks, sizeof(int) * (size_t)prev_rank_count);
260  }
261  ckptctx->initialized = 1;
262 }
263 
264 /* Check if this run is restarted or not.
265  It also finds target checkpoint files of the previous run if restarted. */
266 static int
267 kmr_ckpt_check_restart(KMR *mr, int **target_ranks, int *target_rank_count,
268  int *target_nprocs)
269 {
270  int restarted = 0;
271  _Bool force_start_from_scratch = 0;
272  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
273  struct stat sb;
274  int cc = stat(ckptctx->ckpt_dname, &sb);
275  if (cc == 0) {
276  if (!S_ISDIR(sb.st_mode)) {
277  char msg[KMR_CKPT_MSGLEN];
278  snprintf(msg, sizeof(msg),
279  "Non-directory file for checkpoint directory %s "
280  "already exist",
281  ckptctx->ckpt_dname);
282  kmr_error(mr, msg);
283  }
284  /* Read this rank's log and find target ranks in the previous run */
285  char fpath[KMR_CKPT_PATHLEN];
286  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
287  KMR_CKPT_LOG, mr->rank, 0, fpath, sizeof(fpath));
288  cc = access(fpath, R_OK);
289  if (cc == 0) {
290  struct kmr_ckpt_log log_hdr;
291  unsigned long log_size = 0;
292  FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &log_size);
293  fclose(fp);
294  assert(mr->rank == log_hdr.rank);
295  assert(log_hdr.nprocs > 0);
296  if (log_size == 0) {
297  force_start_from_scratch = 1;
298  char msg[KMR_CKPT_MSGLEN];
299  snprintf(msg, sizeof(msg),
300  "Log file exists, but no log is recorded in %s. "
301  "All logs are discarded and start from scratch",
302  fpath);
303  kmr_warning(mr, 1, msg);
304  }
305  int quotient = log_hdr.nprocs / mr->nprocs;
306  int rest = log_hdr.nprocs % mr->nprocs;
307  int cnt = quotient + ((mr->rank < rest) ? 1 : 0);
308  if (cnt != 0) {
309  *target_ranks = (int*)kmr_malloc(sizeof(int) * (size_t)cnt);
310  int offset = mr->rank * quotient +
311  ((mr->rank < rest) ? mr->rank : rest);
312  for (int i = 0; i < cnt; i++) {
313  (*target_ranks)[i] = offset + i;
314  }
315  }
316  *target_rank_count = cnt;
317  *target_nprocs = log_hdr.nprocs;
318  if (mr->nprocs > log_hdr.nprocs) {
319  // TODO support future
320  char msg[KMR_CKPT_MSGLEN];
321  snprintf(msg, sizeof(msg),
322  "Currently restart with bigger number of processes "
323  "is not supported");
324  kmr_error(mr, msg);
325  }
326  if (mr->ckpt_selective && mr->nprocs != log_hdr.nprocs) {
327  char msg[KMR_CKPT_MSGLEN];
328  snprintf(msg, sizeof(msg),
329  "Restart with different number of processes "
330  "is not supported in selective mode");
331  kmr_error(mr, msg);
332  }
333  ckptctx->prev_mode = log_hdr.mode;
334  } else {
335  char msg[KMR_CKPT_MSGLEN];
336  snprintf(msg, sizeof(msg),
337  "Structure of a checkpoint directory may be wrong %s. "
338  "Delete all checkpoint directories",
339  ckptctx->ckpt_dname);
340  kmr_error(mr, msg);
341  }
342  } else {
343  if (errno != ENOENT) {
344  char msg[KMR_CKPT_MSGLEN];
345  snprintf(msg, sizeof(msg),
346  "Unknown error on checkpoint directory %s",
347  ckptctx->ckpt_dname);
348  kmr_error(mr, msg);
349  }
350  assert(*target_rank_count == 0);
351  }
352 
353  /* Check consistency of target checkpoint log files to decide restart */
354  if (*target_rank_count > 0) {
355  for (int i = 1; i < *target_rank_count; i++) {
356  _Bool success = 1;
357  int t_rank = (*target_ranks)[i];
358  char dpath[KMR_CKPT_DIRLEN];
359  snprintf(dpath, KMR_CKPT_DIRLEN, "./%s%05d", KMR_CKPT_DIRNAME,
360  t_rank);
361  char fpath[KMR_CKPT_PATHLEN];
362  kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
363  t_rank, 0, fpath, sizeof(fpath));
364  cc = access(fpath, R_OK);
365  if (cc == 0) {
366  struct kmr_ckpt_log log_hdr;
367  unsigned long log_size = 0;
368  FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &log_size);
369  fclose(fp);
370  if (log_hdr.nprocs < 0) {
371  success = 0;
372  }
373  if (log_size == 0) {
374  force_start_from_scratch = 1;
375  char msg[KMR_CKPT_MSGLEN];
376  snprintf(msg, sizeof(msg),
377  "Log file exists, but no log is recorded in %s. "
378  "All logs are discarded and start from scratch",
379  fpath);
380  kmr_warning(mr, 1, msg);
381  }
382  } else {
383  success = 0;
384  }
385  if (!success) {
386  kmr_free(*target_ranks, (size_t)*target_rank_count);
387  char msg[KMR_CKPT_MSGLEN];
388  snprintf(msg, sizeof(msg),
389  "Wrong structure of checkpoint directory %s. ",
390  dpath);
391  kmr_error(mr, msg);
392  }
393  }
394  if (!force_start_from_scratch) {
395  restarted = 1;
396  }
397  }
398  return restarted;
399 }
400 
401 /* Restore MapReduce progress of the previous run for all mode. */
402 static void
403 kmr_ckpt_restore_prev_progress_all(KMR *mr,
404  int *target_ranks, int target_rank_count)
405 {
406  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
407  long min_progress = -1;
408 
409  /* read all ckpt logs files to find the minimum progress */
410  for (int i = 0; i < target_rank_count; i++) {
411  int rank = target_ranks[i];
412  char dpath[KMR_CKPT_DIRLEN];
413  snprintf(dpath, KMR_CKPT_DIRLEN, "./%s%05d", KMR_CKPT_DIRNAME, rank);
414  char fpath[KMR_CKPT_PATHLEN];
415  kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
416  rank, 0, fpath, sizeof(fpath));
417  struct kmr_ckpt_log log_hdr;
418  unsigned long total, size = 0;
419  FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &total);
420  long max_done_op = 0, cur_op = 0;
421  _Bool num_procs_locked = 0;
422  while (size < total) {
423  struct kmr_ckpt_log_entry e;
424  size_t rc = fread((void *)&e, sizeof(e), 1, fp);
425  if (rc != 1) {
426  char msg[KMR_CKPT_MSGLEN];
427  snprintf(msg, sizeof(msg),
428  "Failed to read a checkpoint log entry");
429  kmr_error(mr, msg);
430  }
431  switch (e.state) {
432  case KMR_CKPT_LOG_WHOLE_START:
433  case KMR_CKPT_LOG_BLOCK_START:
434  case KMR_CKPT_LOG_INDEX_START:
435  cur_op = e.op_seqno;
436  break;
437  case KMR_CKPT_LOG_WHOLE_FINISH:
438  case KMR_CKPT_LOG_BLOCK_FINISH:
439  case KMR_CKPT_LOG_INDEX_FINISH:
440  max_done_op = cur_op;
441  cur_op = 0;
442  break;
443  case KMR_CKPT_LOG_SKIPPED:
444  max_done_op = e.op_seqno;
445  break;
446  case KMR_CKPT_LOG_LOCK_START:
447  assert(num_procs_locked == 0);
448  num_procs_locked = 1;
449  break;
450  case KMR_CKPT_LOG_LOCK_FINISH:
451  assert(num_procs_locked == 1);
452  num_procs_locked = 0;
453  break;
454  }
455  size += sizeof(e);
456  }
457  fclose(fp);
458  if (num_procs_locked && target_rank_count > 1) {
459  /* Can not restart with the different number of processes */
460  char msg[KMR_CKPT_MSGLEN];
461  snprintf(msg, sizeof(msg),
462  "Fault occurred in a critical region and can not restart "
463  "with the different number of processes. "
464  "Restart with the same number of processes with "
465  "the previous run.");
466  kmr_error(mr, msg);
467  }
468  if (min_progress < 0) {
469  min_progress = max_done_op;
470  } else {
471  if (max_done_op < min_progress) {
472  min_progress = max_done_op;
473  }
474  }
475  }
476  assert(min_progress >= 0);
477 
478  /* Find global minimal progress */
479  long global_min_progress;
480  int cc = MPI_Allreduce(&min_progress, &global_min_progress, 1, MPI_LONG,
481  MPI_MIN, mr->comm);
482  assert(cc == MPI_SUCCESS);
483 
484  ckptctx->prev_progress = min_progress;
485  ckptctx->prev_global_progress = global_min_progress;
486 }
487 
488 /* Restore MapReduce progress of the previous run for selective mode. */
489 static void
490 kmr_ckpt_restore_prev_progress_selective(KMR *mr, int *target_ranks,
491  int target_rank_count)
492 {
493  long min_progress = -1, max_progress = -1;
494  /* read all ckpt logs files to find the minimum progress */
495  for (int i = 0; i < target_rank_count; i++) {
496  int rank = target_ranks[i];
497  char dpath[KMR_CKPT_DIRLEN];
498  snprintf(dpath, KMR_CKPT_DIRLEN, "./%s%05d", KMR_CKPT_DIRNAME, rank);
499  char fpath[KMR_CKPT_PATHLEN];
500  kmr_ckpt_make_fname(dpath, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
501  rank, 0, fpath, sizeof(fpath));
502  struct kmr_ckpt_log log_hdr;
503  unsigned long total, size = 0;
504  FILE *fp = kmr_ckpt_open_log(mr, fpath, &log_hdr, &total);
505  long target_kvs_id = KMR_CKPT_DUMMY_ID;
506  /* stores kvs transition chains */
507  struct kmr_ckpt_kvs_chains chains;
508  kmr_ckpt_kvs_chains_init(&chains);
509  /* stores id of kvses that has been checkpointed */
510  struct kmr_ckpt_list kvses;
511  kmr_ckpt_int_list_init(&kvses);
512  _Bool num_procs_locked = 0;
513  while (size < total) {
514  struct kmr_ckpt_log_entry e;
515  size_t rc = fread((void *)&e, sizeof(e), 1, fp);
516  if (rc != 1) {
517  char msg[KMR_CKPT_MSGLEN];
518  snprintf(msg, sizeof(msg),
519  "Failed to read a checkpoint log entry");
520  kmr_error(mr, msg);
521  }
522  struct kmr_ckpt_operation op = { .op_seqno = e.op_seqno,
523  .kvi_id = e.kvi_id,
524  .kvo_id = e.kvo_id };
525  long v;
526  switch (e.state) {
527  case KMR_CKPT_LOG_WHOLE_START:
528  target_kvs_id = e.kvo_id;
529  break;
530  case KMR_CKPT_LOG_WHOLE_FINISH:
531  kmr_ckpt_int_list_add(&kvses, target_kvs_id);
532  target_kvs_id = KMR_CKPT_DUMMY_ID;
533  break;
534  case KMR_CKPT_LOG_DELETABLE:
535  v = kmr_ckpt_int_list_del(&kvses, e.kvo_id);
536  assert(v == e.kvo_id);
537  break;
538  case KMR_CKPT_LOG_PROGRESS:
539  case KMR_CKPT_LOG_SKIPPED:
540  if (op.kvi_id == KMR_CKPT_DUMMY_ID) {
541  kmr_ckpt_kvs_chains_new_chain(&chains, op);
542  } else {
543  kmr_ckpt_kvs_chains_connect(&chains, op);
544  }
545  break;
546  case KMR_CKPT_LOG_LOCK_START:
547  assert(num_procs_locked == 0);
548  num_procs_locked = 1;
549  break;
550  case KMR_CKPT_LOG_LOCK_FINISH:
551  assert(num_procs_locked == 1);
552  num_procs_locked = 0;
553  break;
554  }
555  size += sizeof(e);
556  }
557  fclose(fp);
558  if (num_procs_locked) {
559  /* nothing to do as currently selective mode does not support
560  restart with different number of procs */
561  }
562 
563  /* calculate progress */
564  long open_min_progress = LONG_MAX;
565  long open_max_progress = 0;
566  long last_op_id = 0;
567  for (int j = 0; j < chains.chainlst_size; j++) {
568  struct kmr_ckpt_list *list = &(chains.chainlst[j]);
569  struct kmr_ckpt_operation *last_op =
570  (struct kmr_ckpt_operation *)list->tail->val;
571  if (last_op->op_seqno > last_op_id) {
572  last_op_id = last_op->op_seqno;
573  }
574  if (last_op->kvo_id != KMR_CKPT_DUMMY_ID) {
575  /* chain is open */
576  struct kmr_ckpt_list_item *item;
577  for (item = list->tail; item != 0; item = item->prev) {
578  struct kmr_ckpt_operation *op =
579  (struct kmr_ckpt_operation *)item->val;
580  long v = kmr_ckpt_int_list_search(&kvses, op->kvo_id);
581  if (v == op->kvo_id) {
582  if (op->op_seqno < open_min_progress) {
583  open_min_progress = op->op_seqno;
584  }
585  if (op->op_seqno > open_max_progress) {
586  open_max_progress = op->op_seqno;
587  }
588  break;
589  }
590  }
591  }
592  }
593  if (open_min_progress == LONG_MAX && open_max_progress == 0) {
594  open_min_progress = last_op_id;
595  open_max_progress = last_op_id;
596  }
597 
598  /* initialize the skip operation list */
599  struct kmr_ckpt_list *skip_ops = mr->ckpt_ctx->slct_skip_ops;
600  for (int j = 0; j < chains.chainlst_size; j++) {
601  struct kmr_ckpt_list *list = &(chains.chainlst[j]);
602  struct kmr_ckpt_operation *last_op =
603  (struct kmr_ckpt_operation *)list->tail->val;
604  if (last_op->op_seqno <= open_min_progress) {
605  continue;
606  }
607  struct kmr_ckpt_operation *head_op =
608  (struct kmr_ckpt_operation *)list->head->val;
609  if (head_op->op_seqno > open_max_progress) {
610  continue;
611  }
612  if (last_op->kvo_id == KMR_CKPT_DUMMY_ID) {
613  /* chain is closed.
614  add all operations larger than 'open_min_progress' */
615  struct kmr_ckpt_list_item *item;
616  for (item = list->head; item != 0; item = item->next) {
617  struct kmr_ckpt_operation *op =
618  (struct kmr_ckpt_operation *)item->val;
619  if (op->op_seqno > open_min_progress) {
620  kmr_ckpt_int_list_add(skip_ops, op->op_seqno);
621  }
622  }
623  } else {
624  /* chain is open.
625  add all operations larger than 'open_min_progress' and
626  smaller than last-ckpt-saved-operation */
627  _Bool f_add = 0;
628  struct kmr_ckpt_list_item *item;
629  for (item = list->tail; item != 0; item = item->prev) {
630  struct kmr_ckpt_operation *op =
631  (struct kmr_ckpt_operation *)item->val;
632  long v = kmr_ckpt_int_list_search(&kvses, op->kvo_id);
633  if (v == op->kvo_id) {
634  f_add = 1;
635  }
636  if (f_add) {
637  if (op->op_seqno > open_min_progress) {
638  kmr_ckpt_int_list_add(skip_ops, op->op_seqno);
639  }
640  }
641  }
642  }
643  }
644  kmr_ckpt_kvs_chains_free(&chains);
645  kmr_ckpt_int_list_free(&kvses);
646  min_progress = open_min_progress;
647  max_progress = open_max_progress;
648  }
649 
650  assert(max_progress >= 0 && min_progress >= 0);
651  mr->ckpt_ctx->prev_progress = max_progress;
652  mr->ckpt_ctx->prev_global_progress = min_progress;
653 }
654 
655 /* Restore MapReduce progress of the previous run. */
656 static void
657 kmr_ckpt_restore_prev_progress(KMR *mr,
658  int *target_ranks, int target_rank_count)
659 {
660  if (!mr->ckpt_selective) {
661  kmr_ckpt_restore_prev_progress_all(mr, target_ranks,
662  target_rank_count);
663  } else {
664  kmr_ckpt_restore_prev_progress_selective(mr, target_ranks,
665  target_rank_count);
666  }
667 }
668 
669 /* Read checkpoints of previous run and create restart files */
670 static void
671 kmr_ckpt_restore_prev_state(KMR *mr, const char *wdpath,
672  int *target_ranks, int target_rank_count,
673  int prev_nprocs)
674 {
675  /* Load checkpoint data files of each rank */
676  char **rdpaths =
677  (char **)kmr_malloc(sizeof(char *) * (size_t)target_rank_count);
678  struct kmr_ckpt_data_file **dataflsts = (struct kmr_ckpt_data_file **)
679  kmr_malloc(sizeof(struct kmr_ckpt_data_file *) * (size_t)target_rank_count);
680  int *nfiles = (int *)kmr_malloc(sizeof(int) * (size_t)target_rank_count);
681  int max_merge_count = 0;
682  for (int i = 0; i < target_rank_count; i++) {
683  rdpaths[i] = (char*)kmr_malloc(sizeof(char) * KMR_CKPT_DIRLEN);
684  snprintf(rdpaths[i], KMR_CKPT_DIRLEN, "./%s%05d",
685  KMR_CKPT_DIRNAME, target_ranks[i]);
686  kmr_ckpt_get_data_flist(mr, rdpaths[i], &dataflsts[i], &nfiles[i], 1);
687  max_merge_count += nfiles[i];
688  }
689 
690  /* Collect information for merging checkpoint data */
691  struct kmr_ckpt_merge_ctx merge_ctx;
692  merge_ctx.max_each_merge = target_rank_count;
693  merge_ctx.merges_count = 0;
694  merge_ctx.merges = (struct kmr_ckpt_merge *)
695  kmr_malloc(sizeof(struct kmr_ckpt_merge) * (size_t)max_merge_count);
696 
697  for (int i = 0; i < target_rank_count; i++) {
698  struct kmr_ckpt_prev_state prev_state;
699  prev_state.prev_rank = target_ranks[i];
700  prev_state.prev_nprocs = prev_nprocs;
701  prev_state.ckpt_dir = rdpaths[i];
702  prev_state.dataflst = dataflsts[i];
703  prev_state.dataflst_size = nfiles[i];
704  kmr_ckpt_restore_prev_state_each_rank(mr, &prev_state, &merge_ctx);
705  }
706 
707 #if 0
708  /* debug print */
709  if (mr->rank == 0) {
710  for (int i = 0; i < target_rank_count; i++) {
711  fprintf(stderr, "index: %d\n", i);
712  fprintf(stderr, " rdpath: %s\n", rdpaths[i]);
713  fprintf(stderr, " nfiles: %d\n", nfiles[i]);
714  for (int j = 0; j < nfiles[i]; j++) {
715  struct kmr_ckpt_data_file *file = &dataflsts[i][j];
716  fprintf(stderr, " ckptflst: %ld, %s/%s\n",
717  file->kvs_id, file->dname, file->fname);
718  }
719  }
720  fprintf(stderr, "max_merge_count: %d\n", max_merge_count);
721 
722  fprintf(stderr, "\n\n");
723 
724  fprintf(stderr, "merge_count: %d\n", merge_ctx.merges_count);
725  for (int i = 0; i < merge_ctx.merges_count; i++) {
726  fprintf(stderr, "merge\n");
727  fprintf(stderr, " rank: %d\n", merge_ctx.merges[i].rank);
728  fprintf(stderr, " kvs_id: %ld\n", merge_ctx.merges[i].kvs_id);
729  fprintf(stderr, " src_lst: %d\n",
730  merge_ctx.merges[i].src_lst_count);
731  for (int j = 0; j < merge_ctx.merges[i].src_lst_count; j++) {
732  struct kmr_ckpt_merge_source *source =
733  &(merge_ctx.merges[i].src_lst[j]);
734  fprintf(stderr, " rank: %d, n_kvi: %ld, n_kvo: %ld\n",
735  source->rank, source->n_kvi, source->n_kvo);
736  fprintf(stderr, " file: %s/%s\n",
737  source->file->dname, source->file->fname);
738  if (merge_ctx.merges[i].src_lst[j].done_ikv_lst_size > 0) {
739  fprintf(stderr, " done ikvs index: ");
740  for (int k = 0; k < source->done_ikv_lst_size; k++) {
741  fprintf(stderr, "%ld,", source->done_ikv_lst[k]);
742  }
743  fprintf(stderr, "\n");
744  }
745  }
746  }
747  }
748 #endif
749 
750  /* Create sorted checkpoint file */
751  for (int i = 0; i < merge_ctx.merges_count; i++) {
752  struct kmr_ckpt_merge *merge = &merge_ctx.merges[i];
753  for (int j = 0; j < merge->src_lst_count; j++) {
754  if (merge->src_lst[j].n_kvi > 0 &&
755  merge->src_lst[j].done_ikv_lst_size > 0) {
756  /* checkpoint data should be sorted */
757  kmr_ckpt_merge_sort_data(mr, wdpath, merge->kvs_id,
758  &merge->src_lst[j]);
759  }
760  }
761  }
762 
763  /* Write merged checkpoint data to files */
764  for (int i = 0; i < merge_ctx.merges_count; i++) {
765  kmr_ckpt_merge_write_file(mr, wdpath, &merge_ctx.merges[i]);
766  }
767 
768  /* Setup map/reduce operation start point */
769  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
770  for (int i = 0; i < merge_ctx.merges_count; i++) {
771  struct kmr_ckpt_merge *merge = &merge_ctx.merges[i];
772  for (int j = 0; j < merge->src_lst_count; j++) {
773  struct kmr_ckpt_merge_source* mg_src = &merge->src_lst[j];
774  if (mg_src->kvi_op_seqno > 0) {
775  ckptctx->kv_positions_count++;
776  break;
777  }
778  }
779  }
780  ckptctx->kv_positions = (struct kv_position *)
781  kmr_malloc(sizeof(struct kv_position) * (size_t)ckptctx->kv_positions_count);
782  int idx = 0;
783  for (int i = 0; i < merge_ctx.merges_count; i++) {
784  struct kmr_ckpt_merge *merge = &merge_ctx.merges[i];
785  _Bool found = 0;
786  for (int j = 0; j < merge->src_lst_count; j++) {
787  struct kmr_ckpt_merge_source *mg_src = &merge->src_lst[j];
788  if (mg_src->n_kvi > 0) {
789  struct kv_position *kvpos = &ckptctx->kv_positions[idx];
790  if (!found){
791  found = 1;
792  kvpos->op_seqno = mg_src->kvi_op_seqno;
793  kvpos->start_from = mg_src->n_kvi;
794  } else {
795  assert(mg_src->kvi_op_seqno == kvpos->op_seqno);
796  kvpos->start_from += mg_src->n_kvi;
797  }
798  }
799  }
800  if (found) {
801  idx++;
802  }
803  }
804 
805 #if 0
806  /* debug print */
807  for (int i = 0; i < ckptctx->kv_positions_count; i++) {
808  fprintf(stderr, "op_seqno: %ld, start_from: %ld\n",
809  ckptctx->kv_positions[i].op_seqno,
810  ckptctx->kv_positions[i].start_from);
811  }
812 #endif
813 
814  /* post process */
815  for (int i = 0; i < merge_ctx.merges_count; i++) {
816  for (int j = 0; j < merge_ctx.merges[i].src_lst_count; j++) {
817  if (merge_ctx.merges[i].src_lst[j].done_ikv_lst_size > 0) {
818  struct kmr_ckpt_merge_source *mg_src = &(merge_ctx.merges[i].src_lst[j]);
819  kmr_free(mg_src->done_ikv_lst,
820  sizeof(long) * (size_t)mg_src->done_ikv_lst_size);
821  char fpath[KMR_CKPT_PATHLEN];
822  snprintf(fpath, KMR_CKPT_PATHLEN, "%s/%s",
823  mg_src->file->dname, mg_src->file->fname);
824  unlink(fpath);
825  kmr_free(mg_src->file, sizeof(struct kmr_ckpt_data_file));
826  }
827  }
828  }
829  kmr_free(merge_ctx.merges,
830  sizeof(struct kmr_ckpt_merge) * (size_t)max_merge_count);
831  for (int i = 0; i < target_rank_count; i++) {
832  kmr_free(dataflsts[i],
833  sizeof(struct kmr_ckpt_data_file) * (size_t)nfiles[i]);
834  kmr_free(rdpaths[i], sizeof(char) * KMR_CKPT_DIRLEN);
835  }
836  kmr_free(dataflsts,
837  sizeof(struct kmr_ckpt_data_file *) * (size_t)target_rank_count);
838  kmr_free(nfiles, sizeof(int) * (size_t)target_rank_count);
839  kmr_free(rdpaths, sizeof(char *) * (size_t)target_rank_count);
840 }
841 
842 /* Read previous checkpoint data of a rank and record merge information
843  for all mode. */
844 static void
845 kmr_ckpt_restore_prev_state_each_rank_all
846 (KMR *mr, struct kmr_ckpt_prev_state *prev_state,
847  struct kmr_ckpt_merge_ctx *merge_ctx)
848 {
849  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
850  char logfile[KMR_CKPT_PATHLEN];
851  kmr_ckpt_make_fname(prev_state->ckpt_dir, KMR_CKPT_FNAME_PREFIX,
852  KMR_CKPT_LOG, prev_state->prev_rank, 0,
853  logfile, sizeof(logfile));
854  struct kmr_ckpt_log log_hdr;
855  unsigned long total, size = 0;
856  FILE *fp = kmr_ckpt_open_log(mr, logfile, &log_hdr, &total);
857 
858  long cur_op = 0;
859  /* list of ignored inconsistent KVSes */
860  struct kmr_ckpt_kvs_chains kvs_chains;
861  kmr_ckpt_kvs_chains_init(&kvs_chains);
862  /* list of completed spawn key-values */
863  struct kmr_ckpt_list spawn_dones;
864  /* used for map/reduce & spawn map */
865  long nkvi = 0, nkvo = 0;
866  /* used for find undeleted kvs */
867  long undel_kvs_id = 0;
868  struct kmr_ckpt_log_entry last_log = { 0, 0, 0, 0, 0, 0 };
869  while (size < total) {
870  struct kmr_ckpt_log_entry e;
871  size_t rc = fread((void *)&e, sizeof(e), 1, fp);
872  if (rc != 1) {
873  char msg[KMR_CKPT_MSGLEN];
874  snprintf(msg, sizeof(msg),
875  "Failed to read a checkpoint log entry");
876  kmr_error(mr, msg);
877  }
878  struct kmr_ckpt_operation op = { .op_seqno = e.op_seqno,
879  .kvi_id = e.kvi_id,
880  .kvo_id = e.kvo_id };
881  switch (e.state) {
882  case KMR_CKPT_LOG_WHOLE_START:
883  cur_op = e.op_seqno;
884  if (cur_op <= ckptctx->prev_global_progress) {
885  /* Operation can be skipped
886  Procesed in KMR_CKPT_LOG_WHOLE_FINISH */
887  } else { /* cur_op > ckptctx->prev_global_progress */
888  if (e.kvi_id == KMR_CKPT_DUMMY_ID) {
889  /* Ignore a kvs generated by kmr_map_once called
890  in this region */
891  kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
892  merge_ctx);
893  kmr_ckpt_kvs_chains_new_chain(&kvs_chains, op);
894  } else {
895  /* If the kvs is generated from a KVS generated by
896  kmr_map_once in this regin, ignore it */
897  int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains,
898  e.kvi_id);
899  if (cc == 0) {
900  kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
901  merge_ctx);
902  kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
903  }
904  }
905  }
906  if (cur_op > ckptctx->prev_progress) {
907  last_log = e;
908  }
909  break;
910  case KMR_CKPT_LOG_WHOLE_FINISH:
911  assert(e.op_seqno == cur_op);
912  kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
913  prev_state, merge_ctx);
914  cur_op = 0;
915  break;
916  case KMR_CKPT_LOG_BLOCK_START:
917  cur_op = e.op_seqno;
918  assert(e.kvi_id != KMR_CKPT_DUMMY_ID);
919  if (cur_op <= ckptctx->prev_global_progress) {
920  /* Operation can be skipped
921  Procesed in KMR_CKPT_LOG_BLOCK_FINISH */
922  } else { /* cur_op > ckptctx->prev_global_progress */
923  /* If the kvs is generated from a KVS generated by
924  kmr_map_once in this regin, ignore it */
925  int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains, e.kvi_id);
926  if (cc == 0) {
927  kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
928  merge_ctx);
929  kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
930  }
931  }
932  if (cur_op > ckptctx->prev_progress) {
933  last_log = e;
934  nkvi = e.n_kvi;
935  nkvo = e.n_kvo;
936  }
937  break;
938  case KMR_CKPT_LOG_BLOCK_ADD:
939  assert(e.op_seqno == cur_op);
940  if (cur_op > ckptctx->prev_progress) {
941  last_log = e;
942  nkvi += e.n_kvi;
943  nkvo += e.n_kvo;
944  }
945  break;
946  case KMR_CKPT_LOG_BLOCK_FINISH:
947  assert(e.op_seqno == cur_op);
948  kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
949  prev_state, merge_ctx);
950  cur_op = 0;
951  if (cur_op > ckptctx->prev_progress) {
952  nkvi = 0;
953  nkvo = 0;
954  }
955  break;
956  case KMR_CKPT_LOG_INDEX_START:
957  cur_op = e.op_seqno;
958  assert(e.kvi_id != KMR_CKPT_DUMMY_ID);
959  if (cur_op <= ckptctx->prev_global_progress) {
960  /* Operation can be skipped
961  Procesed in KMR_CKPT_LOG_BLOCK_FINISH */
962  } else { /* cur_op > ckptctx->prev_global_progress */
963  /* If the kvs is generated from a KVS generated by
964  kmr_map_once in this regin, ignore it */
965  int cc = kmr_ckpt_merge_check_ignorable(&kvs_chains, e.kvi_id);
966  if (cc == 0) {
967  kmr_ckpt_merge_ignore_ckpt_data(e.kvo_id, prev_state,
968  merge_ctx);
969  kmr_ckpt_kvs_chains_connect(&kvs_chains, op);
970  }
971  }
972  if (cur_op > ckptctx->prev_progress) {
973  last_log = e;
974  kmr_ckpt_int_list_init(&spawn_dones);
975  nkvi = e.n_kvi;
976  nkvo = e.n_kvo;
977  }
978  break;
979  case KMR_CKPT_LOG_INDEX_ADD:
980  assert(e.op_seqno == cur_op);
981  if (cur_op > ckptctx->prev_progress) {
982  last_log = e;
983  kmr_ckpt_int_list_add(&spawn_dones, e.n_kvi);
984  nkvi += 1;
985  nkvo += e.n_kvo;
986  }
987  break;
988  case KMR_CKPT_LOG_INDEX_FINISH:
989  assert(e.op_seqno == cur_op);
990  kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
991  prev_state, merge_ctx);
992  cur_op = 0;
993  if (cur_op > ckptctx->prev_progress) {
994  kmr_ckpt_int_list_free(&spawn_dones);
995  nkvi = 0;
996  nkvo = 0;
997  }
998  break;
999  case KMR_CKPT_LOG_DELETE_START:
1000  undel_kvs_id = e.kvi_id;
1001  break;
1002  case KMR_CKPT_LOG_DELETE_FINISH:
1003  assert(e.kvi_id = undel_kvs_id);
1004  undel_kvs_id = 0;
1005  break;
1006  case KMR_CKPT_LOG_SKIPPED:
1007  kmr_ckpt_merge_store_ckpt_data(e.kvo_id, mr->rank, -1,
1008  prev_state, merge_ctx);
1009  break;
1010  }
1011  size += sizeof(e);
1012  }
1013  if (cur_op != 0) {
1014  /* Process the last log */
1015  switch (last_log.state) {
1016  case KMR_CKPT_LOG_WHOLE_START:
1017  kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1018  merge_ctx);
1019  break;
1020  case KMR_CKPT_LOG_BLOCK_START:
1021  kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1022  merge_ctx);
1023  break;
1024  case KMR_CKPT_LOG_BLOCK_ADD:
1025  kmr_ckpt_merge_store_ckpt_data(last_log.kvo_id, mr->rank, nkvo,
1026  prev_state, merge_ctx);
1027  kmr_ckpt_merge_update_ckpt_data(last_log.kvi_id, mr->rank,
1028  last_log.op_seqno, nkvi, NULL,
1029  prev_state, merge_ctx);
1030  break;
1031  case KMR_CKPT_LOG_INDEX_START:
1032  kmr_ckpt_merge_ignore_ckpt_data(last_log.kvo_id, prev_state,
1033  merge_ctx);
1034  break;
1035  case KMR_CKPT_LOG_INDEX_ADD:
1036  kmr_ckpt_merge_store_ckpt_data(last_log.kvo_id, mr->rank, nkvo,
1037  prev_state, merge_ctx);
1038  assert(nkvi >= spawn_dones.size);
1039  kmr_ckpt_merge_update_ckpt_data(last_log.kvi_id, mr->rank,
1040  last_log.op_seqno, nkvi,
1041  &spawn_dones, prev_state,
1042  merge_ctx);
1043  kmr_ckpt_int_list_free(&spawn_dones);
1044  break;
1045  }
1046  }
1047  if (undel_kvs_id != 0) {
1048  /* Ignore the ckpt data */
1049  kmr_ckpt_merge_ignore_ckpt_data(undel_kvs_id, prev_state, merge_ctx);
1050  }
1051  kmr_ckpt_kvs_chains_free(&kvs_chains);
1052  fclose(fp);
1053 
1054  for (int i = 0; i < prev_state->dataflst_size; i++) {
1055  if (prev_state->dataflst[i].checked != 1) {
1056  char msg[KMR_CKPT_MSGLEN];
1057  snprintf(msg, sizeof(msg),
1058  "Checkpoint state is wrong. "
1059  "Delete all checkpoint and restart again");
1060  kmr_error(mr, msg);
1061  }
1062  }
1063 }
1064 
1065 /* Read previous checkpoint data of a rank and record merge information
1066  for selective mode. */
1067 static void
1068 kmr_ckpt_restore_prev_state_each_rank_selective
1069 (KMR *mr, struct kmr_ckpt_prev_state *prev_state,
1070  struct kmr_ckpt_merge_ctx *merge_ctx)
1071 {
1072  char logfile[KMR_CKPT_PATHLEN];
1073  kmr_ckpt_make_fname(prev_state->ckpt_dir, KMR_CKPT_FNAME_PREFIX,
1074  KMR_CKPT_LOG, prev_state->prev_rank, 0,
1075  logfile, sizeof(logfile));
1076  struct kmr_ckpt_log log_hdr;
1077  unsigned long total, size = 0;
1078  FILE *fp = kmr_ckpt_open_log(mr, logfile, &log_hdr, &total);
1079 
1080  /* stores id of kvses whose ckpt data files should be deleted */
1081  struct kmr_ckpt_list kvses;
1082  kmr_ckpt_int_list_init(&kvses);
1083  while (size < total) {
1084  struct kmr_ckpt_log_entry e;
1085  size_t rc = fread((void *)&e, sizeof(e), 1, fp);
1086  if (rc != 1) {
1087  char msg[KMR_CKPT_MSGLEN];
1088  snprintf(msg, sizeof(msg),
1089  "Failed to read a checkpoint log entry");
1090  kmr_error(mr, msg);
1091  }
1092  switch (e.state) {
1093  case KMR_CKPT_LOG_DELETABLE:
1094  kmr_ckpt_int_list_add(&kvses, e.kvo_id);
1095  break;
1096  }
1097  size += sizeof(e);
1098  }
1099  struct kmr_ckpt_data_file *dataflst = prev_state->dataflst;
1100  for (int i = 0; i < prev_state->dataflst_size; i++) {
1101  struct kmr_ckpt_data_file *file = &dataflst[i];
1102  long v = kmr_ckpt_int_list_rsearch(&kvses, file->kvs_id);
1103  if (v == file->kvs_id) {
1104  /* ignore should-be-deleted checkpoint data */
1105  kmr_ckpt_merge_ignore_ckpt_data(file->kvs_id, prev_state,
1106  merge_ctx);
1107  } else {
1108  kmr_ckpt_merge_store_ckpt_data(file->kvs_id, mr->rank, -1,
1109  prev_state, merge_ctx);
1110  }
1111  }
1112  kmr_ckpt_int_list_free(&kvses);
1113  fclose(fp);
1114 
1115  for (int i = 0; i < prev_state->dataflst_size; i++) {
1116  if (prev_state->dataflst[i].checked != 1) {
1117  char msg[KMR_CKPT_MSGLEN];
1118  snprintf(msg, sizeof(msg),
1119  "Checkpoint state is wrong. "
1120  "Delete all checkpoint and restart again");
1121  kmr_error(mr, msg);
1122  }
1123  }
1124 }
1125 
1126 /* Read previous checkpoint data of a rank and record merge information. */
1127 static void
1128 kmr_ckpt_restore_prev_state_each_rank(KMR *mr,
1129  struct kmr_ckpt_prev_state *prev_state,
1130  struct kmr_ckpt_merge_ctx *merge_ctx)
1131 {
1132  if (!mr->ckpt_selective) {
1133  kmr_ckpt_restore_prev_state_each_rank_all(mr, prev_state, merge_ctx);
1134  } else {
1135  kmr_ckpt_restore_prev_state_each_rank_selective(mr, prev_state,
1136  merge_ctx);
1137  }
1138 }
1139 
1140 /* It checks if KVI_ID is contained in kvs_chains.
1141  If KVI_ID is contained it returns 0, otherwise it returns -1. */
1142 static int
1143 kmr_ckpt_merge_check_ignorable(struct kmr_ckpt_kvs_chains *chains, long kvi_id)
1144 {
1145  struct kmr_ckpt_list *c = kmr_ckpt_kvs_chains_find(chains, kvi_id);
1146  if (c != 0) {
1147  return 0;
1148  } else {
1149  return -1;
1150  }
1151 }
1152 
1153 static struct kmr_ckpt_data_file *
1154 kmr_ckpt_find_data_file(long kvs_id,
1155  struct kmr_ckpt_data_file *dataflst, int nfiles)
1156 {
1157  struct kmr_ckpt_data_file *file = 0;
1158  for (int i = 0; i < nfiles; i++) {
1159  if (dataflst[i].kvs_id == kvs_id) {
1160  file = &dataflst[i];
1161  break;
1162  }
1163  }
1164  return file;
1165 }
1166 
1167 /* Marks the ckpt data as not-to-be-merged. */
1168 static void
1169 kmr_ckpt_merge_ignore_ckpt_data(long kvo_id,
1170  struct kmr_ckpt_prev_state *prev_state,
1171  struct kmr_ckpt_merge_ctx *merge_ctx)
1172 {
1173  struct kmr_ckpt_data_file *file =
1174  kmr_ckpt_find_data_file(kvo_id, prev_state->dataflst,
1175  prev_state->dataflst_size);
1176  if (file == 0) {
1177  return;
1178  }
1179  file->checked = 1;
1180 
1181  /* If the ckpt data is once marked as 'to-be-merged',
1182  remove this ckpt data from merge source list. */
1183  if (file->merged == 1) {
1184  struct kmr_ckpt_merge *merge = 0;
1185  for (int i = 0; i < merge_ctx->merges_count; i++) {
1186  if (merge_ctx->merges[i].kvs_id == kvo_id) {
1187  merge = &merge_ctx->merges[i];
1188  break;
1189  }
1190  }
1191  assert(merge != 0);
1192  int idx = -1;
1193  struct kmr_ckpt_merge_source *mg_src = 0;
1194  for (int i = 0; i < merge->src_lst_count; i++) {
1195  if (merge->src_lst[i].rank == prev_state->prev_rank) {
1196  idx = i;
1197  mg_src = &merge->src_lst[i];
1198  break;
1199  }
1200  }
1201  assert(idx != -1 && mg_src != 0);
1202  /* Delete the found mg_src */
1203  if (mg_src->done_ikv_lst_size != 0) {
1204  kmr_free(mg_src->done_ikv_lst,
1205  sizeof(long) * (size_t)mg_src->done_ikv_lst_size);
1206  }
1207  if (merge->src_lst_count == 1) {
1208  memset(mg_src, 0, sizeof(struct kmr_ckpt_merge_source));
1209  } else {
1210  for (int i = idx; i < merge->src_lst_count - 1; i++) {
1211  struct kmr_ckpt_merge_source *target = &merge->src_lst[i];
1212  struct kmr_ckpt_merge_source *source = &merge->src_lst[i + 1];
1213  memcpy(target, source, sizeof(struct kmr_ckpt_merge_source));
1214  }
1215  memset(&merge->src_lst[merge->src_lst_count - 1], 0,
1216  sizeof(struct kmr_ckpt_merge_source));
1217  }
1218  merge->src_lst_count -= 1;
1219  }
1220  file->merged = 0;
1221 }
1222 
1223 /* Marks the ckpt data as to-be-merged. */
1224 static void
1225 kmr_ckpt_merge_store_ckpt_data(long kvo_id, int rank, long nkvo,
1226  struct kmr_ckpt_prev_state *prev_state,
1227  struct kmr_ckpt_merge_ctx *merge_ctx)
1228 {
1229  struct kmr_ckpt_data_file *file =
1230  kmr_ckpt_find_data_file(kvo_id, prev_state->dataflst,
1231  prev_state->dataflst_size);
1232  if (file == 0 || file->merged == 1) {
1233  return;
1234  }
1235  file->checked = 1;
1236  file->merged = 1;
1237 
1238  struct kmr_ckpt_merge *merge = 0;
1239  int cnt = merge_ctx->merges_count;
1240  for (int i = 0; i < cnt; i++) {
1241  if (merge_ctx->merges[i].kvs_id == kvo_id) {
1242  merge = &merge_ctx->merges[i];
1243  break;
1244  }
1245  }
1246  if (merge == 0) {
1247  /* Initialize a new merge */
1248  merge = &merge_ctx->merges[cnt++];
1249  merge->rank = rank;
1250  merge->kvs_id = kvo_id;
1251  merge->src_lst = (struct kmr_ckpt_merge_source *)
1252  kmr_malloc(sizeof(struct kmr_ckpt_merge_source) * (size_t)merge_ctx->max_each_merge);
1253  merge->src_lst_count = 0;
1254  merge_ctx->merges_count = cnt;
1255  }
1256  struct kmr_ckpt_merge_source *mg_src = &merge->src_lst[merge->src_lst_count];
1257  mg_src->rank = prev_state->prev_rank;
1258  mg_src->n_kvi = -1;
1259  mg_src->n_kvo = nkvo;
1260  mg_src->done_ikv_lst = 0;
1261  mg_src->done_ikv_lst_size = 0;
1262  mg_src->kvi_op_seqno = -1;
1263  mg_src->file = file;
1264  merge->src_lst_count += 1;
1265 }
1266 
1267 static int
1268 kmr_ckpt_cmp_long(const void *v1, const void *v2)
1269 {
1270  long _v1 = *((long *)v1);
1271  long _v2 = *((long *)v2);
1272  if ( _v1 > _v2 ) {
1273  return 1;
1274  } else if ( _v1 < _v2 ) {
1275  return -1;
1276  } else {
1277  return 0;
1278  }
1279 }
1280 
1281 /* Update information of ckpt data merge. */
1282 static void
1283 kmr_ckpt_merge_update_ckpt_data(long kvi_id, int rank,
1284  long kvi_op_seqno, long nkvi,
1285  struct kmr_ckpt_list *done_ikv_lst,
1286  struct kmr_ckpt_prev_state *prev_state,
1287  struct kmr_ckpt_merge_ctx *merge_ctx)
1288 {
1289  struct kmr_ckpt_data_file *file =
1290  kmr_ckpt_find_data_file(kvi_id, prev_state->dataflst,
1291  prev_state->dataflst_size);
1292  assert(file != 0);
1293  assert(file->checked == 1 && file->merged == 1);
1294 
1295  struct kmr_ckpt_merge *merge = 0;
1296  for (int i = 0; i < merge_ctx->merges_count; i++) {
1297  if (merge_ctx->merges[i].kvs_id == kvi_id) {
1298  merge = &merge_ctx->merges[i];
1299  break;
1300  }
1301  }
1302  assert(merge != 0);
1303  struct kmr_ckpt_merge_source *mg_src = 0;
1304  for (int i = 0; i < merge->src_lst_count; i++) {
1305  if (merge->src_lst[i].rank == prev_state->prev_rank) {
1306  mg_src = &merge->src_lst[i];
1307  break;
1308  }
1309  }
1310  assert(mg_src != 0);
1311  assert(mg_src->n_kvo == -1);
1312  mg_src->n_kvi = nkvi;
1313  mg_src->kvi_op_seqno = kvi_op_seqno;
1314  if (done_ikv_lst->size != 0) {
1315  mg_src->done_ikv_lst =
1316  (long *)kmr_malloc(sizeof(long) * (size_t)done_ikv_lst->size);
1317  struct kmr_ckpt_list_item *item;
1318  int idx = 0;
1319  for (item = done_ikv_lst->head; item != 0; item = item->next) {
1320  mg_src->done_ikv_lst[idx] = *(long *)item->val;
1321  idx += 1;
1322  }
1323  qsort(mg_src->done_ikv_lst, (size_t)done_ikv_lst->size, sizeof(long),
1324  kmr_ckpt_cmp_long);
1325  mg_src->done_ikv_lst_size = done_ikv_lst->size;
1326  }
1327 }
1328 
1329 /* Sort a previous checkpoint data of a kvs so that processed key-values
1330  are moved to the front and unprocessed ones are moved to the back.
1331  It creates a new checkpoint data and replaces pointer to data. */
1332 static void
1333 kmr_ckpt_merge_sort_data(KMR *mr, const char *wdpath, long kvs_id,
1334  struct kmr_ckpt_merge_source *mrg_src)
1335 {
1336  assert(mrg_src->file->merged == 1);
1337  struct kmr_ckpt_data_file *ndata = (struct kmr_ckpt_data_file *)
1338  kmr_malloc(sizeof(struct kmr_ckpt_data_file));
1339  ndata->kvs_id = kvs_id;
1340  ndata->checked = 1;
1341  ndata->merged = 1;
1342  snprintf(ndata->fname, sizeof(ndata->fname), "%s.sorted",
1343  mrg_src->file->fname);
1344  strncpy(ndata->dname, wdpath, sizeof(ndata->dname) - 1);
1345 
1346  char dst_fpath[KMR_CKPT_PATHLEN];
1347  snprintf(dst_fpath, KMR_CKPT_PATHLEN, "%s/%s", ndata->dname, ndata->fname);
1348  int cc = access(dst_fpath, F_OK);
1349  assert(cc != 0);
1350  FILE *wfp = kmr_ckpt_open_path(mr, dst_fpath, "w");
1351 
1352  char tmp_fpath[KMR_CKPT_PATHLEN];
1353  snprintf(tmp_fpath, KMR_CKPT_PATHLEN, "%s/%s.rest",
1354  ndata->dname, ndata->fname);
1355  cc = access(tmp_fpath, F_OK);
1356  assert(cc != 0);
1357 
1358  char src_fpath[KMR_CKPT_PATHLEN];
1359  snprintf(src_fpath, KMR_CKPT_PATHLEN, "%s/%s",
1360  mrg_src->file->dname, mrg_src->file->fname);
1361  struct stat sb;
1362  cc = stat(src_fpath, &sb);
1363  if (cc != 0) {
1364  char msg[KMR_CKPT_MSGLEN];
1365  snprintf(msg, sizeof(msg),
1366  "Failed to access a checkpoint data file %s", src_fpath);
1367  kmr_error(mr, msg);
1368  }
1369  FILE *rfp = kmr_ckpt_open_path(mr, src_fpath, "r");
1370 
1371  /* Write header */
1372  size_t hdrsiz = offsetof(struct kmr_ckpt_data, data);
1373  struct kmr_ckpt_data hdr;
1374  size_t rc = fread((void *)&hdr, hdrsiz, 1, rfp);
1375  if (rc != 1) {
1376  char msg[KMR_CKPT_MSGLEN];
1377  snprintf(msg, sizeof(msg),
1378  "Failed to read a checkpoint data file %s", src_fpath);
1379  kmr_error(mr, msg);
1380  }
1381  rc = fwrite((void *)&hdr, hdrsiz, 1, wfp);
1382  if (rc != 1) {
1383  char msg[KMR_CKPT_MSGLEN];
1384  snprintf(msg, sizeof(msg),
1385  "Failed to write a checkpoint data file %s", dst_fpath);
1386  kmr_error(mr, msg);
1387  }
1388 
1389  /* Write kv */
1390  size_t total_size = (size_t)sb.st_size - hdrsiz;
1391  size_t cur_size = 0;
1392  /* Write processed kv */
1393  {
1394  size_t read_size = 0;
1395  long idx = 0, start_idx = 0;
1396  size_t bufsiz = 8;
1397  void *buf = kmr_malloc(bufsiz);
1398  FILE *wfp2 = kmr_ckpt_open_path(mr, tmp_fpath, "w");
1399  while (read_size < total_size) {
1400  struct kmr_kvs_entry e;
1401  /* Read */
1402  size_t kv_hdrsiz = offsetof(struct kmr_kvs_entry, c);
1403  rc = fread((void *)&e, kv_hdrsiz, 1, rfp);
1404  if (rc != 1) {
1405  char msg[KMR_CKPT_MSGLEN];
1406  snprintf(msg, sizeof(msg),
1407  "Failed to read a checkpoint data file %s", src_fpath);
1408  kmr_error(mr, msg);
1409  }
1410  size_t kv_bdysiz =
1411  (size_t)KMR_ALIGN(e.klen) + (size_t)KMR_ALIGN(e.vlen);
1412  if (bufsiz < kv_bdysiz) {
1413  bufsiz = kv_bdysiz;
1414  buf = kmr_realloc(buf, bufsiz);
1415  }
1416  rc = fread(buf, kv_bdysiz, 1, rfp);
1417  if (rc != 1) {
1418  char msg[KMR_CKPT_MSGLEN];
1419  snprintf(msg, sizeof(msg),
1420  "Failed to read a checkpoint data file %s", src_fpath);
1421  kmr_error(mr, msg);
1422  }
1423  /* Write */
1424  FILE *twfp;
1425  _Bool incp = 0;
1426  for (long i = start_idx; i < mrg_src->done_ikv_lst_size; i++) {
1427  if (idx == mrg_src->done_ikv_lst[i]) {
1428  incp = 1;
1429  break;
1430  }
1431  }
1432  if (incp) {
1433  twfp = wfp;
1434  cur_size += kv_hdrsiz + kv_bdysiz;
1435  start_idx++;
1436  } else {
1437  twfp = wfp2;
1438  }
1439  rc = fwrite((void *)&e, kv_hdrsiz, 1, twfp);
1440  if (rc != 1) {
1441  char msg[KMR_CKPT_MSGLEN];
1442  snprintf(msg, sizeof(msg),
1443  "Failed to write a checkpoint data file %s",
1444  dst_fpath);
1445  kmr_error(mr, msg);
1446  }
1447  rc = fwrite(buf, kv_bdysiz, 1, twfp);
1448  if (rc != 1) {
1449  char msg[KMR_CKPT_MSGLEN];
1450  snprintf(msg, sizeof(msg),
1451  "Failed to write a checkpoint data file %s",
1452  dst_fpath);
1453  kmr_error(mr, msg);
1454  }
1455  read_size += kv_hdrsiz + kv_bdysiz;
1456  idx++;
1457  }
1458  kmr_free(buf, bufsiz);
1459  kmr_ckpt_flush(mr, wfp2);
1460  fclose(wfp2);
1461  }
1462  fclose(rfp);
1463 
1464  /* Write unprocessed kv */
1465  cc = stat(tmp_fpath, &sb);
1466  if (cc != 0) {
1467  char msg[KMR_CKPT_MSGLEN];
1468  snprintf(msg, sizeof(msg),
1469  "Failed to access a checkpoint data file %s", tmp_fpath);
1470  kmr_error(mr, msg);
1471  }
1472  rfp = kmr_ckpt_open_path(mr, tmp_fpath, "r");
1473  void *buf = kmr_malloc((size_t)sb.st_size);
1474  rc = fread(buf, (size_t)sb.st_size, 1, rfp);
1475  if (rc != 1) {
1476  char msg[KMR_CKPT_MSGLEN];
1477  snprintf(msg, sizeof(msg),
1478  "Failed to read a checkpoint data file %s", tmp_fpath);
1479  kmr_error(mr, msg);
1480  }
1481  rc = fwrite(buf, (size_t)sb.st_size, 1, wfp);
1482  if (rc != 1) {
1483  char msg[KMR_CKPT_MSGLEN];
1484  snprintf(msg, sizeof(msg),
1485  "Failed to write a checkpoint data file %s", dst_fpath);
1486  kmr_error(mr, msg);
1487  }
1488  kmr_free(buf, (size_t)sb.st_size);
1489  assert((cur_size + (size_t)sb.st_size) == total_size);
1490  fclose(rfp);
1491  unlink(tmp_fpath);
1492  kmr_ckpt_flush(mr, wfp);
1493  fclose(wfp);
1494 
1495  mrg_src->file = ndata;
1496 }
1497 
1498 /* Merge checkpoint data files */
1499 static void
1500 kmr_ckpt_merge_write_file(KMR *mr, const char *wdpath,
1501  struct kmr_ckpt_merge *merge)
1502 {
1503  char dst_fpath[KMR_CKPT_PATHLEN];
1504  kmr_ckpt_make_fname(wdpath, KMR_CKPT_FNAME_PREFIX,
1505  KMR_CKPT_DATA, merge->rank, merge->kvs_id,
1506  dst_fpath, sizeof(dst_fpath));
1507  int cc = access(dst_fpath, F_OK);
1508  assert(cc != 0);
1509  FILE *wfp = kmr_ckpt_open_path(mr, dst_fpath, "w");
1510 
1511  /* Write header */
1512  char hdr_src_fpath[KMR_CKPT_PATHLEN];
1513  snprintf(hdr_src_fpath, KMR_CKPT_PATHLEN, "%s/%s",
1514  merge->src_lst[0].file->dname, merge->src_lst[0].file->fname);
1515  FILE *rfp = kmr_ckpt_open_path(mr, hdr_src_fpath, "r");
1516  size_t hdrsiz = offsetof(struct kmr_ckpt_data, data);
1517  /* Update nprocs and rank in header */
1518  struct kmr_ckpt_data hdr;
1519  size_t rc = fread((void *)&hdr, hdrsiz, 1, rfp);
1520  if (rc != 1) {
1521  char msg[KMR_CKPT_MSGLEN];
1522  snprintf(msg, sizeof(msg),
1523  "Failed to read a checkpoint data file %s", hdr_src_fpath);
1524  kmr_error(mr, msg);
1525  }
1526  hdr.nprocs = mr->nprocs;
1527  hdr.rank = mr->rank;
1528  rc = fwrite((void *)&hdr, hdrsiz, 1, wfp);
1529  if (rc != 1) {
1530  char msg[KMR_CKPT_MSGLEN];
1531  snprintf(msg, sizeof(msg),
1532  "Failed to write a checkpoint data file %s", dst_fpath);
1533  kmr_error(mr, msg);
1534  }
1535  fclose(rfp);
1536 
1537  /* Open to-be-merged files */
1538  struct merge_file {
1539  FILE *fp;
1540  size_t size; /* total key-value size in byte */
1541  size_t cur_size; /* read key-value size in byte */
1542  };
1543  struct merge_file *mfs = (struct merge_file *)
1544  kmr_malloc(sizeof(struct merge_file) * (size_t)merge->src_lst_count);
1545  for (int i = 0; i < merge->src_lst_count; i++) {
1546  struct kmr_ckpt_data_file *file = merge->src_lst[i].file;
1547  assert(file->merged == 1);
1548  char fpath[KMR_CKPT_PATHLEN];
1549  snprintf(fpath, KMR_CKPT_PATHLEN, "%s/%s", file->dname, file->fname);
1550  struct stat sb;
1551  cc = stat(fpath, &sb);
1552  if (cc != 0) {
1553  char msg[KMR_CKPT_MSGLEN];
1554  snprintf(msg, sizeof(msg),
1555  "Failed to access a checkpoint data file %s", fpath);
1556  kmr_error(mr, msg);
1557  }
1558  hdrsiz = offsetof(struct kmr_ckpt_data, data);
1559  mfs[i].fp = kmr_ckpt_open_path(mr, fpath, "r");
1560  mfs[i].size = (size_t)sb.st_size - hdrsiz;
1561  mfs[i].cur_size = 0;
1562  fseek(mfs[i].fp, (long)hdrsiz, SEEK_SET);
1563  }
1564 
1565  /* Write processed kv when used as kvi */
1566  for (int i = 0; i < merge->src_lst_count; i++) {
1567  struct kmr_ckpt_merge_source *mg_src = &merge->src_lst[i];
1568  if (mg_src->n_kvi > 0) {
1569  assert(mg_src->n_kvo == -1);
1570  long kvicnt = 0;
1571  size_t bufsiz = 8;
1572  unsigned char *buf = (unsigned char *)kmr_malloc(bufsiz);
1573  while (mfs[i].cur_size < mfs[i].size) {
1574  if (kvicnt >= mg_src->n_kvi) {
1575  break;
1576  }
1577  struct kmr_kvs_entry e;
1578  /* Read */
1579  size_t kv_hdrsiz = offsetof(struct kmr_kvs_entry, c);
1580  rc = fread((void *)&e, kv_hdrsiz, 1, mfs[i].fp);
1581  if (rc != 1) {
1582  char msg[KMR_CKPT_MSGLEN];
1583  snprintf(msg, sizeof(msg),
1584  "Failed to read a checkpoint data file");
1585  kmr_error(mr, msg);
1586  }
1587  size_t kv_bdysiz =
1588  (size_t)KMR_ALIGN(e.klen) + (size_t)KMR_ALIGN(e.vlen);
1589  if (bufsiz < kv_bdysiz) {
1590  bufsiz = kv_bdysiz;
1591  buf = (unsigned char *)kmr_realloc(buf, bufsiz);
1592  }
1593  rc = fread((void *)buf, kv_bdysiz, 1, mfs[i].fp);
1594  if (rc != 1) {
1595  char msg[KMR_CKPT_MSGLEN];
1596  snprintf(msg, sizeof(msg),
1597  "Failed to read a checkpoint data file");
1598  kmr_error(mr, msg);
1599  }
1600  /* Write */
1601  rc = fwrite((void *)&e, kv_hdrsiz, 1, wfp);
1602  if (rc != 1) {
1603  char msg[KMR_CKPT_MSGLEN];
1604  snprintf(msg, sizeof(msg),
1605  "Failed to write a checkpoint data file %s",
1606  dst_fpath);
1607  kmr_error(mr, msg);
1608  }
1609  rc = fwrite((void *)buf, kv_bdysiz, 1, wfp);
1610  if (rc != 1) {
1611  char msg[KMR_CKPT_MSGLEN];
1612  snprintf(msg, sizeof(msg),
1613  "Failed to write a checkpoint data file %s",
1614  dst_fpath);
1615  kmr_error(mr, msg);
1616  }
1617  kvicnt += 1;
1618  mfs[i].cur_size += kv_hdrsiz + kv_bdysiz;
1619  }
1620  kmr_free(buf, bufsiz);
1621  }
1622  }
1623 
1624  /* Write generated kv when used as kvo */
1625  for (int i = 0; i < merge->src_lst_count; i++) {
1626  struct kmr_ckpt_merge_source *mg_src = &merge->src_lst[i];
1627  long kvocnt = 0;
1628  size_t bufsiz = 8;
1629  unsigned char *buf = (unsigned char *)kmr_malloc(bufsiz);
1630  while (mfs[i].cur_size < mfs[i].size) {
1631  if ((mg_src->n_kvo >= 0) && (kvocnt >= mg_src->n_kvo)) {
1632  break;
1633  }
1634  struct kmr_kvs_entry e;
1635  /* Read */
1636  size_t kv_hdrsiz = offsetof(struct kmr_kvs_entry, c);
1637  rc = fread((void *)&e, kv_hdrsiz, 1, mfs[i].fp);
1638  if (rc != 1) {
1639  char msg[KMR_CKPT_MSGLEN];
1640  snprintf(msg, sizeof(msg),
1641  "Failed to read a checkpoint data file");
1642  kmr_error(mr, msg);
1643  }
1644  size_t kv_bdysiz =
1645  (size_t)KMR_ALIGN(e.klen) + (size_t)KMR_ALIGN(e.vlen);
1646  if (bufsiz < kv_bdysiz) {
1647  bufsiz = kv_bdysiz;
1648  buf = (unsigned char *)kmr_realloc(buf, bufsiz);
1649  }
1650  rc = fread((void *)buf, kv_bdysiz, 1, mfs[i].fp);
1651  if (rc != 1) {
1652  char msg[KMR_CKPT_MSGLEN];
1653  snprintf(msg, sizeof(msg),
1654  "Failed to read a checkpoint data file");
1655  kmr_error(mr, msg);
1656  }
1657  /* Write */
1658  rc = fwrite((void *)&e, kv_hdrsiz, 1, wfp);
1659  if (rc != 1) {
1660  char msg[KMR_CKPT_MSGLEN];
1661  snprintf(msg, sizeof(msg),
1662  "Failed to write a checkpoint data file %s",
1663  dst_fpath);
1664  kmr_error(mr, msg);
1665  }
1666  rc = fwrite((void *)buf, kv_bdysiz, 1, wfp);
1667  if (rc != 1) {
1668  char msg[KMR_CKPT_MSGLEN];
1669  snprintf(msg, sizeof(msg),
1670  "Failed to write a checkpoint data file %s",
1671  dst_fpath);
1672  kmr_error(mr, msg);
1673  }
1674  kvocnt += 1;
1675  mfs[i].cur_size += kv_hdrsiz + kv_bdysiz;
1676  }
1677  kmr_free(buf, bufsiz);
1678  }
1679 
1680  for (int i = 0; i < merge->src_lst_count; i++) {
1681  fclose(mfs[i].fp);
1682  }
1683  kmr_free(mfs, sizeof(struct merge_file) * (size_t)merge->src_lst_count);
1684  kmr_ckpt_flush(mr, wfp);
1685  fclose(wfp);
1686 }
1687 
1688 /***************************************************************/
1689 /* Functions for logging */
1690 /***************************************************************/
1691 
1692 /* Initialize checkpoint log file */
1693 static void
1694 kmr_ckpt_init_log(KMR *mr, const char *log_fpath)
1695 {
1696  struct kmr_ckpt_log ckptld;
1697  memset((void *)&ckptld, 0, sizeof(ckptld));
1698  if (mr->ckpt_selective) {
1699  ckptld.mode = KMR_CKPT_SELECTIVE;
1700  } else {
1701  ckptld.mode = KMR_CKPT_ALL;
1702  }
1703  ckptld.nprocs = mr->nprocs;
1704  ckptld.rank = mr->rank;
1705  FILE *fp = kmr_ckpt_open_path(mr, log_fpath, "w");
1706  size_t size = offsetof(struct kmr_ckpt_log, data);
1707  size_t ret = fwrite((void *)&ckptld, size, 1, fp);
1708  if (ret != 1) {
1709  char msg[KMR_CKPT_MSGLEN];
1710  snprintf(msg, sizeof(msg),
1711  "Failed to write header of checkpoint log %s", log_fpath);
1712  kmr_error(mr, msg);
1713  }
1714  kmr_ckpt_flush(mr, fp);
1715  mr->ckpt_ctx->ckpt_log_fp = fp;
1716 }
1717 
1718 static void
1719 kmr_ckpt_fin_log(KMR *mr)
1720 {
1721  fclose(mr->ckpt_ctx->ckpt_log_fp);
1722 }
1723 
1724 static inline void
1725 kmr_ckpt_save_log_raw(KMR *mr, struct kmr_ckpt_log_entry *ckptle)
1726 {
1727  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
1728  size_t ret = fwrite((void *)ckptle, sizeof(struct kmr_ckpt_log_entry), 1,
1729  ckptctx->ckpt_log_fp);
1730  if (ret != 1) {
1731  char msg[KMR_CKPT_MSGLEN];
1732  snprintf(msg, sizeof(msg), "Failed to add checkpoint log");
1733  kmr_error(mr, msg);
1734  }
1735  kmr_ckpt_flush(mr, ckptctx->ckpt_log_fp);
1736 }
1737 
1738 static inline void
1739 kmr_ckpt_save_log2(KMR *mr, int state)
1740 {
1741  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
1742  struct kmr_ckpt_log_entry ckptle;
1743  ckptle.op_seqno = ckptctx->progress_counter;
1744  ckptle.kvi_id = ckptctx->cur_kvi_id;
1745  ckptle.kvo_id = ckptctx->cur_kvo_id;
1746  ckptle.state = state;
1747  ckptle.n_kvi = -1;
1748  ckptle.n_kvo = -1;
1749  kmr_ckpt_save_log_raw(mr, &ckptle);
1750 }
1751 
1752 static inline void
1753 kmr_ckpt_save_log4(KMR *mr, int state, long nkvi, long nkvo)
1754 {
1755  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
1756  struct kmr_ckpt_log_entry ckptle;
1757  ckptle.op_seqno = ckptctx->progress_counter;
1758  ckptle.kvi_id = ckptctx->cur_kvi_id;
1759  ckptle.kvo_id = ckptctx->cur_kvo_id;
1760  ckptle.state = state;
1761  ckptle.n_kvi = nkvi;
1762  ckptle.n_kvo = nkvo;
1763  kmr_ckpt_save_log_raw(mr, &ckptle);
1764 }
1765 
1766 static inline void
1767 kmr_ckpt_save_log_del(KMR *mr, int state, long kvs_id)
1768 {
1769  struct kmr_ckpt_log_entry ckptle;
1770  ckptle.op_seqno = -1;
1771  ckptle.kvi_id = kvs_id;
1772  ckptle.kvo_id = kvs_id;
1773  ckptle.state = state;
1774  ckptle.n_kvi = -1;
1775  ckptle.n_kvo = -1;
1776  kmr_ckpt_save_log_raw(mr, &ckptle);
1777 }
1778 
1779 static inline void
1780 kmr_ckpt_save_log_lock(KMR *mr, int state)
1781 {
1782  struct kmr_ckpt_log_entry ckptle;
1783  ckptle.op_seqno = -1;
1784  ckptle.kvi_id = KMR_CKPT_DUMMY_ID;
1785  ckptle.kvo_id = KMR_CKPT_DUMMY_ID;
1786  ckptle.state = state;
1787  ckptle.n_kvi = -1;
1788  ckptle.n_kvo = -1;
1789  kmr_ckpt_save_log_raw(mr, &ckptle);
1790 }
1791 
1792 /* Log the start of kvs operation.
1793  \param[in] mr MapReduce data type
1794 */
1795 static void
1796 kmr_ckpt_log_whole_start(KMR *mr)
1797 {
1798  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_WHOLE_START);
1799 }
1800 
1801 /* Log the end of kvs operation.
1802  \param[in] mr MapReduce data type
1803 */
1804 static void
1805 kmr_ckpt_log_whole_finish(KMR *mr)
1806 {
1807  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_WHOLE_FINISH);
1808 }
1809 
1810 /* Log the start of kvs operation.
1811  \param[in] mr MapReduce data type
1812  \param[in] kvo output KVS
1813 */
1814 static void
1815 kmr_ckpt_log_block_start(KMR *mr, KMR_KVS *kvo)
1816 {
1817  long nkvi = kmr_ckpt_first_unprocessed_kv(mr);
1818  long nkvo = (kvo == 0) ? 0 : kvo->c.element_count;
1819  kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_BLOCK_START, nkvi, nkvo);
1820 }
1821 
1822 /* Log the progress of kvs operation.
1823  \param[in] mr MapReduce data type
1824  \param[in] nkvi number of processed kv in kvi
1825  \param[in] nkvo number of generated kv in kvo
1826 */
1827 static void
1828 kmr_ckpt_log_block_add(KMR *mr, long nkvi, long nkvo)
1829 {
1830  kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_BLOCK_ADD, nkvi, nkvo);
1831 }
1832 
1833 /* Log the end of kvs operation.
1834  \param[in] mr MapReduce data type
1835  \param[in] kvo output KVS
1836 */
1837 static void
1838 kmr_ckpt_log_block_finish(KMR *mr)
1839 {
1840  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_BLOCK_FINISH);
1841 }
1842 
1843 /* Log the start of kvs operation.
1844  \param[in] mr MapReduce data type
1845  \param[in] kvo output KVS
1846 */
1847 static void
1848 kmr_ckpt_log_index_start(KMR *mr, KMR_KVS *kvo)
1849 {
1850  long nkvi = kmr_ckpt_first_unprocessed_kv(mr);
1851  long nkvo = (kvo == 0) ? 0 : kvo->c.element_count;
1852  kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_INDEX_START, nkvi, nkvo);
1853 }
1854 
1855 /* Log the progress of kvs operation.
1856  \param[in] mr MapReduce data type
1857  \param[in] ikv_index index of processed kv in kvi
1858  \param[in] nkvo number of generated kv in kvo
1859 */
1860 static void
1861 kmr_ckpt_log_index_add(KMR *mr, long ikv_index, long nkvo)
1862 {
1863  kmr_ckpt_save_log4(mr, KMR_CKPT_LOG_INDEX_ADD, ikv_index, nkvo);
1864 }
1865 
1866 /* Log the end of kvs operation.
1867  \param[in] mr MapReduce data type
1868 */
1869 static void
1870 kmr_ckpt_log_index_finish(KMR *mr)
1871 {
1872  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_INDEX_FINISH);
1873 }
1874 
1875 /* Log the start of kvs delete.
1876  \param[in] mr MapReduce data type
1877  \param[in] kvs_id ID of target KVS
1878 */
1879 static void
1880 kmr_ckpt_log_delete_start(KMR *mr, long kvs_id)
1881 {
1882  kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETE_START, kvs_id);
1883 }
1884 
1885 /* Log the end of kvs delete.
1886  \param[in] mr MapReduce data type
1887  \param[in] kvs_id ID of target KVS
1888 */
1889 static void
1890 kmr_ckpt_log_delete_finish(KMR *mr, long kvs_id)
1891 {
1892  kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETE_FINISH, kvs_id);
1893 }
1894 
1895 /* Log that the spcified kvs can be deleted.
1896  (used in selective mode)
1897  \param[in] mr MapReduce data type
1898  \param[in] kvs_id ID of target KVS
1899 */
1900 static void
1901 kmr_ckpt_log_deletable(KMR *mr, long kvs_id)
1902 {
1903  kmr_ckpt_save_log_del(mr, KMR_CKPT_LOG_DELETABLE, kvs_id);
1904 }
1905 
1906 /* Log the progress of kvs operation.
1907  \param[in] mr MapReduce data type
1908 */
1909 static void
1910 kmr_ckpt_log_progress(KMR *mr)
1911 {
1912  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
1913  if (!(ckptctx->cur_kvi_id == KMR_CKPT_DUMMY_ID &&
1914  ckptctx->cur_kvo_id == KMR_CKPT_DUMMY_ID) ) {
1915  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_PROGRESS);
1916  }
1917 }
1918 
1919 /* Log the skip of kvs operation.
1920  \param[in] mr MapReduce data type
1921 */
1922 static void
1923 kmr_ckpt_log_skipped(KMR *mr)
1924 {
1925  kmr_ckpt_save_log2(mr, KMR_CKPT_LOG_SKIPPED);
1926 }
1927 
1928 /** Define the start position of code region that is referred when restart.
1929  If an execution is stopped due to an error in this region, restart with
1930  the different number of processes is not allowed.
1931 
1932  \param[in] mr MapReduce data type
1933 */
1935 {
1936  kmr_ckpt_save_log_lock(mr, KMR_CKPT_LOG_LOCK_START);
1937 }
1938 
1939 /** Define the end position of code region that is referred when restart.
1940  If an execution is stopped due to an error in this region, restart with
1941  the different number of processes is not allowed.
1942 
1943  \param[in] mr MapReduce data type
1944 */
1946 {
1947  kmr_ckpt_save_log_lock(mr, KMR_CKPT_LOG_LOCK_FINISH);
1948 }
1949 
1950 static FILE *
1951 kmr_ckpt_open_log(KMR *mr, const char *path, struct kmr_ckpt_log *log_hdr,
1952  unsigned long *size)
1953 {
1954  struct stat sb;
1955  int cc = stat(path, &sb);
1956  if (cc != 0) {
1957  char msg[KMR_CKPT_MSGLEN];
1958  snprintf(msg, sizeof(msg),
1959  "Failed to access a checkpoint log %s", path);
1960  kmr_error(mr, msg);
1961  }
1962  FILE *fp = kmr_ckpt_open_path(mr, path, "r");
1963  size_t hdrsz = offsetof(struct kmr_ckpt_log, data);
1964  size_t rc = fread((void *)log_hdr, hdrsz, 1, fp);
1965  if (rc != 1) {
1966  char msg[KMR_CKPT_MSGLEN];
1967  snprintf(msg, sizeof(msg),
1968  "Failed to read a checkpoint log %s", path);
1969  kmr_error(mr, msg);
1970  }
1971  assert(sb.st_size >= 0);
1972  assert((size_t)sb.st_size >= hdrsz);
1973  *size = (size_t)sb.st_size - hdrsz;
1974  return fp;
1975 }
1976 
1977 /***************************************************************/
1978 /* Functions for checkpoint data management */
1979 /***************************************************************/
1980 
1981 /* It returns 1 if a checkpoint data file should be written to disk.
1982  Otherwise it returns 0. */
1983 static _Bool
1984 kmr_ckpt_write_file_p(KMR *mr)
1985 {
1986  assert(mr->ckpt_enable);
1987  if (mr->ckpt_selective && !mr->ckpt_ctx->slct_cur_take_ckpt) {
1988  return 0;
1989  } else {
1990  return 1;
1991  }
1992 }
1993 
1994 /* This deletes a checkpoint data file of the specified kvs */
1995 static void
1996 kmr_ckpt_delete_ckpt_data(KMR *mr, long kvs_id)
1997 {
1998  char fpath[KMR_CKPT_PATHLEN];
1999  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2000  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2001  KMR_CKPT_DATA, mr->rank, kvs_id, fpath, sizeof(fpath));
2002  int cc = access(fpath, F_OK);
2003  if (cc == 0) {
2004  kmr_ckpt_log_delete_start(mr, kvs_id);
2005  cc = unlink(fpath);
2006  assert(cc == 0);
2007  kmr_ckpt_log_delete_finish(mr, kvs_id);
2008  } else {
2009  /* checkpoint file does not exist. do nothing. */
2010  }
2011 }
2012 
2013 /* This deletes checkpoint files in specified directory.
2014  It also deletes the directory if it becomes empty.
2015  If the directory does not exist, it does nothing. */
2016 static void
2017 kmr_ckpt_delete_ckpt_files(KMR *mr, const char *target_dir, int rank)
2018 {
2019  struct stat sb;
2020  int cc = stat(target_dir, &sb);
2021  if (cc == 0) {
2022  if (!S_ISDIR(sb.st_mode)) {
2023  char msg[KMR_CKPT_MSGLEN];
2024  snprintf(msg, sizeof(msg),
2025  "File %s should not exist or "
2026  "if exists, shoud be a directory.", target_dir);
2027  kmr_error(mr, msg);
2028  }
2029  } else {
2030  return; /* directory does not exist. */
2031  }
2032 
2033  /* Remove all checkpoint data file */
2034  struct kmr_ckpt_data_file *dataflst = NULL;
2035  int nfiles;
2036  kmr_ckpt_get_data_flist(mr, target_dir, &dataflst, &nfiles, 0);
2037  for (int i = 0; i < nfiles; i++) {
2038  char fpath[KMR_CKPT_PATHLEN];
2039  snprintf(fpath, sizeof(fpath), "%s/%s", target_dir, dataflst[i].fname);
2040  cc = access(fpath, F_OK);
2041  if (cc == 0) {
2042  unlink(fpath);
2043  } else {
2044  fprintf(stderr,
2045  "Failed to delete checkpoint file %s on rank[%05d]\n",
2046  fpath, rank);
2047  }
2048  }
2049  if (dataflst != NULL) {
2050  kmr_free(dataflst, sizeof(struct kmr_ckpt_data_file) * (size_t)nfiles);
2051  }
2052  /* Remove checkpoint log file */
2053  {
2054  char fpath[KMR_CKPT_PATHLEN];
2055  kmr_ckpt_make_fname(target_dir, KMR_CKPT_FNAME_PREFIX, KMR_CKPT_LOG,
2056  rank, 0, fpath, sizeof(fpath));
2057  cc = access(fpath, F_OK);
2058  if (cc == 0) {
2059  unlink(fpath);
2060  }
2061  }
2062  /* Delete nprocs file on rank 0 */
2063  if (mr->rank == 0) {
2064  char fpath[KMR_CKPT_PATHLEN];
2065  memset(fpath, 0, sizeof(fpath));
2066  snprintf(fpath, sizeof(fpath), "%s/nprocs", target_dir);
2067  cc = access(fpath, F_OK);
2068  if (cc == 0) {
2069  unlink(fpath);
2070  }
2071  }
2072  /* Delete checkpoint directory */
2073  cc = rmdir(target_dir);
2074  assert(cc == 0);
2075 }
2076 
2077 /* It finds a checkpoint data file named FNAME from DNAME directory and
2078  stores the file info. to FILE. If SETALL is 1, it reads the file to
2079  fill in all fields of FILE. */
2080 static void
2081 kmr_ckpt_init_data_file(KMR *mr, const char *dname , const char *fname,
2082  _Bool setall, struct kmr_ckpt_data_file *file)
2083 {
2084  char fpath[KMR_CKPT_PATHLEN];
2085  snprintf(fpath, KMR_CKPT_PATHLEN, "%s/%s", dname, fname);
2086  int cc = access(fpath, F_OK);
2087  if (cc != 0) {
2088  char msg[KMR_CKPT_MSGLEN];
2089  snprintf(msg, sizeof(msg),
2090  "Failed to access checkpoint file %s", fpath);
2091  kmr_error(mr, msg);
2092  }
2093  if (setall) {
2094  struct kmr_ckpt_data hdr;
2095  size_t hdrsz = offsetof(struct kmr_ckpt_data, data);
2096  FILE *fp = kmr_ckpt_open_path(mr, fpath, "r");
2097  size_t rc = fread((void *)&hdr, hdrsz, 1, fp);
2098  if (rc == 1) {
2099  file->kvs_id = hdr.kvs_id;
2100  } else {
2101  char msg[KMR_CKPT_MSGLEN];
2102  snprintf(msg, sizeof(msg),
2103  "Failed to read checkpoint file %s. Ignore this file",
2104  fpath);
2105  kmr_warning(mr, 1, msg);
2106  file->kvs_id = KMR_CKPT_DUMMY_ID;
2107  file->checked = 1;
2108  }
2109  fclose(fp);
2110  }
2111  strncpy(file->fname, fname, sizeof(file->fname) - 1);
2112  strncpy(file->dname, dname, sizeof(file->dname) - 1);
2113 }
2114 
2115 /* Save all kv in kvs to a checkpoint data file.
2116  \param[in] kvs target KVS
2117 */
2118 static inline void
2119 kmr_ckpt_save_ckpt(KMR_KVS *kvs) {
2120  struct kmr_ckpt_ctx *ckptctx = kvs->c.mr->ckpt_ctx;
2121  size_t tsize = kvs->c.storage_netsize + offsetof(struct kmr_ckpt_data, data);
2122  void *buf = kmr_malloc(tsize);
2123  memset(buf, 0, tsize);
2124 
2125  struct kmr_ckpt_data *ckpt = (struct kmr_ckpt_data *)buf;
2126  ckpt->nprocs = kvs->c.mr->nprocs;
2127  ckpt->rank = kvs->c.mr->rank;
2128  ckpt->kvs_id = kvs->c.ckpt_kvs_id;
2129  ckpt->key_data = kvs->c.key_data;
2130  ckpt->value_data = kvs->c.value_data;
2131 
2132  enum kmr_kv_field keyf = kmr_unit_sized_or_opaque(kvs->c.key_data);
2133  enum kmr_kv_field valf = kmr_unit_sized_or_opaque(kvs->c.value_data);
2134 
2135  unsigned char *p = (unsigned char *)&ckpt->data[0];
2136  ckptctx->saved_current_block = kvs->c.current_block; // save current_block
2137  kvs->c.current_block = kvs->c.first_block;
2138  long cnt = 0;
2139  while (cnt < kvs->c.element_count) {
2140  assert(kvs->c.current_block != 0);
2141  struct kmr_kv_box ev;
2142  struct kmr_kvs_block *b = kvs->c.current_block;
2143  struct kmr_kvs_entry *e = kmr_kvs_first_entry(kvs, b);
2144  for (long i = 0; i < b->partial_element_count; i++) {
2145  assert(e != 0);
2146  ev = kmr_pick_kv(e, kvs);
2147  kmr_poke_kv2((struct kmr_kvs_entry *)p, ev, 0, keyf, valf, 0);
2148  p += kmr_kvs_entry_netsize((struct kmr_kvs_entry *)p);
2149  e = kmr_kvs_next(kvs, e, 1);
2150  cnt++;
2151  }
2152  kvs->c.current_block = b->next;
2153  }
2154  kvs->c.current_block = ckptctx->saved_current_block; // restore current_block
2155 
2156  FILE *fp = kmr_ckpt_open(kvs, "w");
2157  size_t ret = fwrite(buf, tsize, 1, fp);
2158  if (ret != 1) {
2159  char msg[KMR_CKPT_MSGLEN];
2160  snprintf(msg, sizeof(msg), "Checkpoint: save checkpoint error write failed");
2161  kmr_error(kvs->c.mr, msg);
2162  }
2163  kmr_ckpt_flush(kvs->c.mr, fp);
2164  fclose(fp);
2165  if (buf != NULL) {
2166  free(buf);
2167  }
2168 }
2169 
2170 /* Create a checkpoint file only with header and open it */
2171 static inline void
2172 kmr_ckpt_kv_record_init_data(KMR *mr, KMR_KVS *kvs)
2173 {
2174  if (kvs == 0) {
2175  mr->ckpt_ctx->ckpt_data_fp = NULL;
2176  return;
2177  }
2178 
2179  FILE *fp;
2180  char fpath[KMR_CKPT_PATHLEN];
2181  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2182  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2183  KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2184  fpath, sizeof(fpath));
2185  int cc = access(fpath, W_OK);
2186  if (cc == 0) {
2187  /* if checkpoint already exist, open it. */
2188  fp = kmr_ckpt_open_path(mr, fpath, "a+");
2189  } else if (cc < 0 && errno == ENOENT) {
2190  /* if checkpoint does not exist, create it. */
2191  struct kmr_ckpt_data ckpt;
2192  memset((void *)&ckpt, 0, sizeof(ckpt));
2193  ckpt.nprocs = mr->nprocs;
2194  ckpt.rank = mr->rank;
2195  ckpt.kvs_id = kvs->c.ckpt_kvs_id;
2196  ckpt.key_data = kvs->c.key_data;
2197  ckpt.value_data = kvs->c.value_data;
2198  fp = kmr_ckpt_open_path(mr, fpath, "w+");
2199  size_t size = offsetof(struct kmr_ckpt_data, data);
2200  size_t ret = fwrite((void *)&ckpt, size, 1, fp);
2201  if (ret != 1) {
2202  char msg[KMR_CKPT_MSGLEN];
2203  snprintf(msg, sizeof(msg),
2204  "Failed to write header of checkpoint file %s", fpath);
2205  kmr_error(mr, msg);
2206  }
2207  kmr_ckpt_flush(mr, fp);
2208  } else {
2209  assert(0);
2210  }
2211  ckptctx->ckpt_data_fp = fp;
2212 }
2213 
2214 /* Initialization for KV recording while executing Map/Reduce.
2215  \param[in] kvi input KVS
2216  \param[in] kvo output KVS
2217 */
2218 static void
2219 kmr_ckpt_kv_record_init(KMR *mr, KMR_KVS *kvo)
2220 {
2221  /* initialize kvo checkpoint file */
2222  kmr_ckpt_kv_record_init_data(mr, kvo);
2223  /* set element_count, adding_point, currnet_block to ckpt_ctx */
2224  if (kvo != 0) {
2225  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2226  ckptctx->saved_current_block = kvo->c.current_block;
2227  ckptctx->saved_adding_point = kvo->c.adding_point;
2228  ckptctx->saved_element_count = kvo->c.element_count;
2229  }
2230 }
2231 
2232 /* Add new kv in kvs to checkpoint data.
2233  As a result of Map/Reduce operation, newly generated kv(s)
2234  are added to checkpoint data file.
2235  \param[in] kvo target KVS
2236  \return number of kv added to a checkpoint data
2237 */
2238 static long
2239 kmr_ckpt_kv_record_add(KMR_KVS *kvo)
2240 {
2241  if (kvo == 0) {
2242  return 0;
2243  }
2244 
2245  struct kmr_ckpt_ctx *ckptctx = kvo->c.mr->ckpt_ctx;
2246  assert(ckptctx->ckpt_data_fp != NULL);
2247  long cnt = kvo->c.element_count - ckptctx->saved_element_count;
2248  assert(cnt >= 0);
2249  struct kmr_kvs_block *b = ckptctx->saved_current_block;
2250  if (b == 0) {
2251  b = kvo->c.first_block;
2252  }
2253  struct kmr_kvs_entry *e = ckptctx->saved_adding_point;
2254  if (e == 0) {
2255  e = kmr_kvs_first_entry(kvo, b);
2256  }
2257 
2258  for (long i = 0; i < cnt; i++) {
2259  if (kmr_kvs_entry_tail_p(e)) {
2260  b = b->next;
2261  assert(b != 0);
2262  e = kmr_kvs_first_entry(kvo, b);
2263  }
2264  /* save data in e as checkpoint data. */
2265  size_t size = kmr_kvs_entry_netsize(e);
2266  size_t ret = fwrite((void *)e, size, 1, ckptctx->ckpt_data_fp);
2267  if (ret != 1) {
2268  char msg[KMR_CKPT_MSGLEN];
2269  snprintf(msg, sizeof(msg),
2270  "Failed to add kv to a checkpoint file");
2271  kmr_error(kvo->c.mr, msg);
2272  }
2273  e = kmr_kvs_next_entry(kvo, e);
2274  }
2275  kmr_ckpt_flush(kvo->c.mr, ckptctx->ckpt_data_fp);
2276  ckptctx->saved_current_block = b;
2277  ckptctx->saved_adding_point = e;
2278  ckptctx->saved_element_count = kvo->c.element_count;
2279  return cnt;
2280 }
2281 
2282 /* Finish KV recording
2283  \param[in] kvi input KVS
2284  \param[in] kvo output KVS
2285 */
2286 static void
2287 kmr_ckpt_kv_record_fin(KMR *mr)
2288 {
2289  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2290  /* close opened checkpoint data file. */
2291  if (ckptctx->ckpt_data_fp != NULL) {
2292  kmr_ckpt_flush(mr, ckptctx->ckpt_data_fp);
2293  fclose(ckptctx->ckpt_data_fp);
2294  }
2295  /* cleanup context. */
2296  ckptctx->ckpt_data_fp = NULL;
2297  ckptctx->saved_element_count = 0;
2298  ckptctx->saved_adding_point = NULL;
2299  ckptctx->saved_current_block = NULL;
2300 }
2301 
2302 /* List up ckeckpoint data files in the specified directory DNAME.
2303  It sets files to FLIST and its count to NFILES.
2304  If SETALL is 1, it reads data files to initialize all fields
2305  of structure. */
2306 static void
2307 kmr_ckpt_get_data_flist(KMR *mr, const char *dname,
2308  struct kmr_ckpt_data_file **flist, int *nfiles,
2309  _Bool setall)
2310 {
2311  struct stat sb;
2312  int cc = stat(dname, &sb);
2313  if (cc < 0) {
2314  *nfiles = 0;
2315  return;
2316  }
2317  if (!S_ISDIR(sb.st_mode)) {
2318  fprintf(stderr, "File %s is not a directory.\n", dname);
2319  *nfiles = 0;
2320  return;
2321  }
2322 
2323  size_t direntsz;
2324  long nmax = pathconf(dname, _PC_NAME_MAX);
2325  if (nmax == -1) {
2326  direntsz = (64 * 1024);
2327  } else {
2328  direntsz = (offsetof(struct dirent, d_name) + (size_t)nmax + 1);
2329  }
2330  DIR *d;
2331  struct dirent *dent;
2332  char b[direntsz];
2333 
2334  d = opendir(dname);
2335  if (d == NULL) {
2336  fprintf(stderr, "Failed to open directory %s.\n", dname);
2337  *nfiles = 0;
2338  return;
2339  }
2340 
2341  char prefix[KMR_CKPT_PATHLEN];
2342  snprintf(prefix, KMR_CKPT_PATHLEN, KMR_CKPT_FNAME_PREFIX"_data_");
2343  int cnt = 0;
2344  while (readdir_r(d, (void *)b, &dent) == 0) {
2345  if (dent == NULL) {
2346  break;
2347  }
2348  cc = strncmp(dent->d_name, prefix, strlen(prefix));
2349  if (cc == 0) {
2350  cnt++;
2351  }
2352  }
2353 
2354  size_t siz = sizeof(struct kmr_ckpt_data_file) * (size_t)cnt;
2355  struct kmr_ckpt_data_file *dataflst = kmr_malloc(siz);
2356  memset(dataflst, 0, siz);
2357 
2358  rewinddir(d);
2359  cnt = 0;
2360  while (readdir_r(d, (void *)b, &dent) == 0) {
2361  if (dent == NULL) {
2362  break;
2363  }
2364  cc = strncmp(dent->d_name, prefix, strlen(prefix));
2365  if (cc == 0) {
2366  kmr_ckpt_init_data_file(mr, dname, dent->d_name, setall,
2367  &dataflst[cnt]);
2368  cnt++;
2369  }
2370  }
2371  (void)closedir(d);
2372 
2373  *flist = dataflst;
2374  *nfiles = cnt;
2375 }
2376 
2377 /* save mr->nprocs to file. */
2378 static void
2379 kmr_ckpt_save_nprocs(KMR *mr, const char *dname)
2380 {
2381  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2382  const char *target = (dname != 0) ? dname : ckptctx->ckpt_dname;
2383  char fpath[KMR_CKPT_PATHLEN], wstring[128], msg[KMR_CKPT_MSGLEN];
2384  memset(fpath, 0, sizeof(fpath));
2385  snprintf(fpath, sizeof(fpath), "%s/nprocs", target);
2386  int cc = access(fpath, R_OK);
2387  if (cc == 0) {
2388  return;
2389  } else {
2390  FILE *fp = kmr_ckpt_open_path(mr, fpath, "w");
2391  if (fp == NULL) {
2392  snprintf(msg, sizeof(msg),
2393  "Failed to open nprocs file %s", fpath);
2394  kmr_error(mr, msg);
2395  }
2396  memset(wstring, 0, sizeof(wstring));
2397  snprintf(wstring, sizeof(wstring), "nprocs=%d\n", mr->nprocs);
2398  size_t ret = fwrite(wstring, strlen(wstring), 1, fp);
2399  if (ret != 1) {
2400  snprintf(msg, sizeof(msg), "Failed to save nprocs to file %s",
2401  fpath);
2402  kmr_error(mr, msg);
2403  }
2404  kmr_ckpt_flush(mr, fp);
2405  fclose(fp);
2406  }
2407 }
2408 
2409 /* Open KVS checkpoint data file. */
2410 static FILE *
2411 kmr_ckpt_open(KMR_KVS *kvs, const char *mode)
2412 {
2413  char fpath[KMR_CKPT_PATHLEN];
2414  KMR *mr = kvs->c.mr;
2415  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2416  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2417  KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2418  fpath, sizeof(fpath));
2419  FILE *fp = kmr_ckpt_open_path(mr, fpath, mode);
2420  return fp;
2421 }
2422 
2423 /* Open file using file path. */
2424 static FILE *
2425 kmr_ckpt_open_path(KMR *mr, const char *fpath, const char *mode)
2426 {
2427  FILE *fp = fopen(fpath, mode);
2428  if (fp == NULL) {
2429  char msg[KMR_CKPT_MSGLEN];
2430  snprintf(msg, sizeof(msg),
2431  "Failed to open a checkpoint file %s", fpath);
2432  kmr_error(mr, msg);
2433  }
2434  int cc = fcntl(fileno(fp), F_SETFD, FD_CLOEXEC);
2435  assert(cc == 0);
2436  return fp;
2437 }
2438 
2439 /* Compose checkpoint file name. */
2440 static void
2441 kmr_ckpt_make_fname(const char *dirname, const char *fprefix,
2442  enum kmr_ckpt_type type,
2443  int rank, long kvs_id, char *fpath, size_t len)
2444 {
2445  memset(fpath, 0, len);
2446  assert(type == KMR_CKPT_DATA || type == KMR_CKPT_LOG);
2447  if (type == KMR_CKPT_DATA) {
2448  snprintf(fpath, len-1, "%s/%s_data_%05d_%03ld",
2449  dirname, fprefix, rank, kvs_id);
2450  } else if (type == KMR_CKPT_LOG) {
2451  snprintf(fpath, len-1, "%s/%s_log_%05d",
2452  dirname, fprefix, rank);
2453  }
2454 }
2455 
2456 /* Flush write to the specified file */
2457 static void
2458 kmr_ckpt_flush(KMR *mr, FILE *fp)
2459 {
2460  fflush(fp);
2461  if (!mr->ckpt_no_fsync) {
2462  int cc = fsync(fileno(fp));
2463  assert(cc == 0);
2464  }
2465 }
2466 
2467 /***************************************************************/
2468 /* Public functions */
2469 /***************************************************************/
2470 
2471 /** Check if checkpoint/restart is enabled.
2472 
2473  \param[in] mr MapReduce data type
2474  \return It returns 1 if checkpoint/restart is enabled.
2475  Otherwise it returns 0.
2476 */
2477 int
2479 {
2480  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2481  if (mr->ckpt_enable == 1 && ckptctx->initialized) {
2482  return 1;
2483  } else {
2484  return 0;
2485  }
2486 }
2487 
2488 /** It temporally disables checkpoint/restart.
2489 
2490  \param[in] mr MapReduce data type
2491  \return If it succeeds disabling, it returns a lock id.
2492  Otherwise it returns 0.
2493 */
2495 {
2496  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2497  if (mr->ckpt_enable == 1 && ckptctx->initialized &&
2498  ckptctx->lock_id == 0) {
2499  mr->ckpt_enable = 0;
2500  ckptctx->lock_id = ++ckptctx->lock_counter;
2501  return ckptctx->lock_id;
2502  } else {
2503  return 0;
2504  }
2505 }
2506 
2507 /** It temporally enables checkpoint/restart which has been
2508  disabled by calling kmr_ckpt_disable_ckpt().
2509 
2510  \param[in] mr MapReduce data type
2511  \param[in] lock_id ID of lock returned by kmr_ckpt_disable_ckpt()
2512  \return If it succeeds enabling, it returns 1.
2513  Otherwise it returns 0.
2514 */
2515 int kmr_ckpt_enable_ckpt(KMR *mr, int lock_id)
2516 {
2517  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2518  if (mr->ckpt_enable == 0 && ckptctx->initialized &&
2519  ckptctx->lock_id == lock_id) {
2520  mr->ckpt_enable = 1;
2521  ckptctx->lock_id = 0;
2522  return 1;
2523  } else {
2524  return 0;
2525  }
2526 }
2527 
2528 /** It returns the index of the first unprocessed key-value in the input KVS.
2529 
2530  \param[in] mr MapReduce data type
2531  \return It returns the index of the first unprocessed key-value
2532  in the input KVS.
2533 */
2534 long
2536 {
2537  if (mr->ckpt_selective) {
2538  return 0;
2539  }
2540  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2541  long op_seqno = ckptctx->progress_counter;
2542  long start_from = 0;
2543  for (int i = 0; i < ckptctx->kv_positions_count; i++) {
2544  if (ckptctx->kv_positions[i].op_seqno == op_seqno) {
2545  start_from = ckptctx->kv_positions[i].start_from;
2546  break;
2547  }
2548  }
2549  return start_from;
2550 }
2551 
2552 /** It restores checkpoint data to kvs.
2553 
2554  \param[out] kvs an KVS where the checkpoint data will be restored
2555 */
2556 void
2558 {
2559  KMR *mr = kvs->c.mr;
2560  char fpath[KMR_CKPT_PATHLEN];
2561  kmr_ckpt_make_fname(mr->ckpt_ctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2562  KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2563  fpath, sizeof(fpath));
2564  int cc = access(fpath, R_OK);
2565  if (cc != 0) {
2566  /* checkpoint file does not exist. */
2567  return;
2568  }
2569  struct stat sb;
2570  cc = stat(fpath, &sb);
2571  if (cc != 0) {
2572  char msg[KMR_CKPT_MSGLEN];
2573  snprintf(msg, sizeof(msg),
2574  "Failed to access a checkpoint file %s", fpath);
2575  kmr_error(kvs->c.mr, msg);
2576  }
2577  size_t siz = (size_t)sb.st_size;
2578  void *buf = kmr_malloc(siz);
2579  FILE *fp = kmr_ckpt_open_path(kvs->c.mr, fpath, "r");
2580  size_t ret = fread(buf, siz, 1, fp);
2581  if (ret != 1) {
2582  char msg[KMR_CKPT_MSGLEN];
2583  snprintf(msg, sizeof(msg),
2584  "Failed to load a checkpoint file %s", fpath);
2585  kmr_error(kvs->c.mr, msg);
2586  }
2587  fclose(fp);
2588 
2589  struct kmr_ckpt_data *ckpt = (struct kmr_ckpt_data *)buf;
2590  size_t cur_siz = offsetof(struct kmr_ckpt_data, data);
2591  struct kmr_kvs_entry *e = (struct kmr_kvs_entry *)&ckpt->data[0];
2592  while (cur_siz < siz) {
2593  struct kmr_kv_box kv;
2594 // if ( kmr_kvs_entry_tail_p(e) ) {
2595 // break;
2596 // }
2597  assert(e != 0);
2598  kv = kmr_pick_kv(e, kvs); // This is ok.
2599  kmr_add_kv(kvs, kv);
2600  cur_siz += kmr_kvs_entry_netsize(e);
2601  e = kmr_kvs_next_entry(kvs, e);
2602  }
2603  kmr_free(buf, siz);
2604  assert(cur_siz == siz);
2605 }
2606 
2607 /** It removes checkpoint data file.
2608 
2609  \param[in] kvs KVS whose checkpoint data is removed
2610 */
2611 void
2613 {
2614  KMR *mr = kvs->c.mr;
2615  if (!mr->ckpt_selective) {
2616  /* delete the checkpoint data file */
2617  kmr_ckpt_delete_ckpt_data(mr, kvs->c.ckpt_kvs_id);
2618  } else {
2619  /* just mark as deletable */
2620  char fpath[KMR_CKPT_PATHLEN];
2621  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2622  kmr_ckpt_make_fname(ckptctx->ckpt_dname, KMR_CKPT_FNAME_PREFIX,
2623  KMR_CKPT_DATA, mr->rank, kvs->c.ckpt_kvs_id,
2624  fpath, sizeof(fpath));
2625  int cc = access(fpath, F_OK);
2626  if (cc == 0) {
2627  kmr_ckpt_log_deletable(mr, kvs->c.ckpt_kvs_id);
2628  }
2629  }
2630 }
2631 
2632 /** It saves all key-value pairs in the output KVS to a checkpoint data file.
2633 
2634  \param[in] kvi input KVS
2635  \param[in] kvo output KVS
2636 */
2637 void
2639 {
2640  if (kmr_ckpt_write_file_p(mr)) {
2641  kmr_ckpt_log_whole_start(mr);
2642  kmr_ckpt_save_ckpt(kvo);
2643  kmr_ckpt_log_whole_finish(mr);
2644  }
2645 }
2646 
2647 /** It initializes saving blocks of key-value pairs of the output KVS to
2648  a checkpoint data file.
2649 
2650  \param[in] mr MapReduce data type
2651  \param[in] kvo output KVS
2652 */
2653 void
2655 {
2656  if (!mr->ckpt_selective) {
2657  kmr_ckpt_log_block_start(mr, kvo);
2658  kmr_ckpt_kv_record_init(mr, kvo);
2659  }
2660 }
2661 
2662 /** It adds a new block of key-value pairs of the output KVS to the
2663  checkpoint data file.
2664 
2665  \param[in] mr MapReduce data type
2666  \param[in] kvo output KVS
2667  \param[in] nkvi number of processed kv in the input KVS
2668 */
2669 void
2671 {
2672  if (!mr->ckpt_selective) {
2673  long nkvo = kmr_ckpt_kv_record_add(kvo);
2674  kmr_ckpt_log_block_add(mr, nkvi, nkvo);
2675  }
2676 }
2677 
2678 /** It finalizes saving block of key-value pairs of the output KVS to
2679  the checkpoint data file.
2680 
2681  \param[in] mr MapReduce data type
2682  \param[in] kvo output KVS
2683 */
2684 void
2686 {
2687  if (!mr->ckpt_selective) {
2688  kmr_ckpt_kv_record_fin(mr);
2689  kmr_ckpt_log_block_finish(mr);
2690  } else {
2691  /* incase of selective mode, save all key-values here */
2692  kmr_ckpt_save_kvo_whole(mr, kvo);
2693  }
2694 }
2695 
2696 /** It initializes saving indexed key-value pairs of the output KVS
2697  to a checkpoint data file.
2698 
2699  \param[in] mr MapReduce data type
2700  \param[in] kvo output KVS
2701 */
2702 void
2704 {
2705  if (!mr->ckpt_selective) {
2706  kmr_ckpt_log_index_start(mr, kvo);
2707  kmr_ckpt_kv_record_init(mr, kvo);
2708  }
2709 }
2710 
2711 /** It adds new key-value pairs of the output KVS to the checkpoint data file.
2712 
2713  \param[in] mr MapReduce data type
2714  \param[in] kvo output KVS
2715  \param[in] ikv_index index of processed kv in the input KVS
2716 */
2717 void
2718 kmr_ckpt_save_kvo_each_add(KMR *mr, KMR_KVS *kvo, long ikv_index)
2719 {
2720  if (!mr->ckpt_selective) {
2721  long nkvo = kmr_ckpt_kv_record_add(kvo);
2722  kmr_ckpt_log_index_add(mr, ikv_index, nkvo);
2723  }
2724 }
2725 
2726 /** It finalizes saving indexed key-value pairs of the output KVS
2727  to the checkpoint data file.
2728 
2729  \param[in] mr MapReduce data type
2730  \param[in] kvo output KVS
2731 */
2732 void
2734 {
2735  if (!mr->ckpt_selective) {
2736  kmr_ckpt_kv_record_fin(mr);
2737  kmr_ckpt_log_index_finish(mr);
2738  } else {
2739  /* incase of selective mode, save all key-values here */
2740  kmr_ckpt_save_kvo_whole(mr, kvo);
2741  }
2742 }
2743 
2744 /** It initializes a progress of MapReduce checkpointing.
2745 
2746  \param[in] kvi input KVS to a MapReduce operation
2747  \param[in] kvo output KVS to the MapReduce operation
2748  \param[in] opt struct kmr_option
2749  \return It returns 1 if operation can be skipped.
2750  Otherwise it returns 0.
2751 */
2752 int
2754 {
2755  KMR *mr = (kvo != 0) ? kvo->c.mr : kvi->c.mr;
2756  if (opt.keep_open) {
2757  char msg[KMR_CKPT_MSGLEN];
2758  snprintf(msg, sizeof(msg),
2759  "'keep_open' option can't be used when checkpoint/restart"
2760  " is enabled");
2761  kmr_error(mr, msg);
2762  }
2763 
2764  /* initialize progress */
2765  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2766  ckptctx->progress_counter += 1;
2767  if (kvi != 0) {
2768  kvi->c.ckpt_consumed_op = ckptctx->progress_counter;
2769  }
2770  if (kvo != 0) {
2771  kvo->c.ckpt_generated_op = ckptctx->progress_counter;
2772  }
2773  assert(ckptctx->cur_kvi_id == KMR_CKPT_DUMMY_ID);
2774  assert(ckptctx->cur_kvo_id == KMR_CKPT_DUMMY_ID);
2775  if (kvi != 0) {
2776  ckptctx->cur_kvi_id = kvi->c.ckpt_kvs_id;
2777  } else {
2778  ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2779  }
2780  if (kvo != 0) {
2781  ckptctx->cur_kvo_id = kvo->c.ckpt_kvs_id;
2782  } else {
2783  ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2784  }
2785 
2786  /* check if the operation can be skipped or not */
2787  int do_skip;
2788  long progress = ckptctx->progress_counter;
2789  if (!mr->ckpt_selective) {
2790  /* ckpt all */
2791  if (progress <= ckptctx->prev_global_progress) {
2792  do_skip = 1;
2793  } else if (progress > ckptctx->prev_global_progress &&
2794  progress <= ckptctx->prev_progress ) {
2795  if (kvi == 0) { /* In case of kmr_map_once */
2796  do_skip = 0;
2797  } else {
2798  if (kvi->c.element_count == 0) {
2799  do_skip = 1;
2800  } else {
2801  do_skip = 0;
2802  }
2803  }
2804  } else { /* progress > ckptctx->prev_progress */
2805  do_skip = 0;
2806  }
2807  } else {
2808  /* ckpt selective */
2809  if (progress <= ckptctx->prev_global_progress) {
2810  do_skip = 1;
2811  } else if (progress > ckptctx->prev_global_progress &&
2812  progress <= ckptctx->prev_progress ) {
2813  long v = kmr_ckpt_int_list_del(ckptctx->slct_skip_ops, progress);
2814  if (v == progress) {
2815  do_skip = 1;
2816  } else {
2817  do_skip = 0;
2818  }
2819  } else { /* progress > ckptctx->prev_progress */
2820  do_skip = 0;
2821  }
2822  }
2823  if (do_skip == 1) {
2824  kmr_ckpt_log_skipped(mr);
2825  ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2826  ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2827  return 1;
2828  }
2829 
2830  /* initialize a checkpoint */
2831  if (mr->ckpt_selective) {
2832  if (opt.take_ckpt) {
2833  ckptctx->slct_cur_take_ckpt = 1;
2834  }
2835  }
2836 
2837  return 0;
2838 }
2839 
2840 /** It finalizes the progress of MapReduce checkpointing.
2841 
2842  \param[in] mr MapReduce data type
2843 */
2844 void
2846 {
2847  kmr_ckpt_log_progress(mr);
2848  struct kmr_ckpt_ctx *ckptctx = mr->ckpt_ctx;
2849  if (mr->ckpt_selective) {
2850  ckptctx->slct_cur_take_ckpt = 0;
2851  }
2852  ckptctx->cur_kvi_id = KMR_CKPT_DUMMY_ID;
2853  ckptctx->cur_kvo_id = KMR_CKPT_DUMMY_ID;
2854 }
2855 
2856 /***************************************************************/
2857 /* Utility functions */
2858 /***************************************************************/
2859 
2860 /* Initialize list */
2861 static void
2862 kmr_ckpt_list_init(struct kmr_ckpt_list *list,
2863  kmr_ckpt_list_alocfn_t alocfn,
2864  kmr_ckpt_list_freefn_t freefn,
2865  kmr_ckpt_list_compfn_t compfn)
2866 {
2867  list->head = 0;
2868  list->tail = 0;
2869  list->size = 0;
2870  list->alocfn = alocfn;
2871  list->freefn = freefn;
2872  list->compfn = compfn;
2873 }
2874 
2875 /* Clear all data in the list */
2876 static void
2877 kmr_ckpt_list_free(struct kmr_ckpt_list *list)
2878 {
2879  struct kmr_ckpt_list_item *item, *del;
2880  for (item = list->head; item != 0; ) {
2881  del = item;
2882  item = item->next;
2883  (*(list->freefn))(del);
2884  }
2885  kmr_ckpt_list_init(list, list->alocfn, list->freefn, list->compfn);
2886 }
2887 
2888 /* Add an item to the tail of the list.
2889  If size of list is over max count, it deletes the oldest item. */
2890 static void
2891 kmr_ckpt_list_add(struct kmr_ckpt_list *list, void *val)
2892 {
2893  struct kmr_ckpt_list_item *item;
2894  size_t isize = sizeof(struct kmr_ckpt_list_item);
2895  if (list->size == KMR_CKPT_LIST_MAX) {
2896  item = list->head;
2897  list->head = list->head->next;
2898  list->head->prev = 0;
2899  (*(list->freefn))(item->val);
2900  kmr_free(item, isize);
2901  list->size -= 1;
2902  }
2903  item = (struct kmr_ckpt_list_item *)kmr_malloc(isize);
2904  item->val = (*(list->alocfn))(val);
2905  item->next = 0;
2906  item->prev = 0;
2907  if (list->head == 0) {
2908  list->head = item;
2909  list->tail = item;
2910  } else {
2911  list->tail->next = item;
2912  item->prev = list->tail;
2913  list->tail = item;
2914  }
2915  list->size += 1;
2916 }
2917 
2918 /* Delete the specified value from the list.
2919  If it successes to delete it returns the value, otherwise it returns NULL.
2920  It searches from the head. */
2921 static void *
2922 kmr_ckpt_list_del(struct kmr_ckpt_list *list, void *val)
2923 {
2924  _Bool found = 0;
2925  struct kmr_ckpt_list_item *item;
2926  for (item = list->head; item != 0; item = item->next) {
2927  if ((*(list->compfn))(item->val, val) == 0) {
2928  found = 1;
2929  break;
2930  }
2931  }
2932  if (found) {
2933  void *ret = item->val;
2934  if (!(item == list->head || item == list->tail)) {
2935  item->prev->next = item->next;
2936  item->next->prev = item->prev;
2937  }
2938  if (item == list->head) {
2939  list->head = item->next;
2940  if (list->head != 0) {
2941  list->head->prev = 0;
2942  }
2943  }
2944  if (item == list->tail) {
2945  list->tail = item->prev;
2946  if (list->tail != 0) {
2947  list->tail->next = 0;
2948  }
2949  }
2950  kmr_free(item, sizeof(struct kmr_ckpt_list_item));
2951  list->size -= 1;
2952  return ret;
2953  } else {
2954  return 0;
2955  }
2956 }
2957 
2958 /* It searchs the specified value from the head of the list.
2959  If the value is found it returns the value, otherwise it returns NULL. */
2960 static void *
2961 kmr_ckpt_list_search(struct kmr_ckpt_list *list, void *val)
2962 {
2963  struct kmr_ckpt_list_item *item;
2964  for (item = list->head; item != 0; item = item->next) {
2965  if ((*(list->compfn))(item->val, val) == 0) {
2966  return item->val;
2967  }
2968  }
2969  return 0;
2970 }
2971 
2972 /* It searchs the specified value from the tail of the list.
2973  If the value is found it returns the value, otherwise it returns NULL. */
2974 static void *
2975 kmr_ckpt_list_rsearch(struct kmr_ckpt_list *list, void *val)
2976 {
2977  struct kmr_ckpt_list_item *item;
2978  for (item = list->tail; item != 0; item = item->prev) {
2979  if ((*(list->compfn))(item->val, val) == 0) {
2980  return item->val;
2981  }
2982  }
2983  return 0;
2984 }
2985 
2986 /* allocator for int item */
2987 static void *
2988 kmr_ckpt_int_list_alocfn(void *val)
2989 {
2990  long *v = kmr_malloc(sizeof(long));
2991  *v = *(long *)val;
2992  return v;
2993 }
2994 
2995 /* deallocator for int item */
2996 static void
2997 kmr_ckpt_int_list_freefn(void *val)
2998 {
2999  kmr_free(val, sizeof(long));
3000 }
3001 
3002 /* comparetor for int item */
3003 static int
3004 kmr_ckpt_int_list_compfn(void *v1, void *v2)
3005 {
3006  long _v1 = *(long *)v1;
3007  long _v2 = *(long *)v2;
3008  if ( _v1 > _v2 ) {
3009  return 1;
3010  } else if ( _v1 < _v2 ) {
3011  return -1;
3012  } else {
3013  return 0;
3014  }
3015 }
3016 
3017 /* Initialize integer list */
3018 static void
3019 kmr_ckpt_int_list_init(struct kmr_ckpt_list *list)
3020 {
3021  kmr_ckpt_list_init(list, kmr_ckpt_int_list_alocfn,
3022  kmr_ckpt_int_list_freefn, kmr_ckpt_int_list_compfn);
3023 }
3024 
3025 /* Clear all data in the list */
3026 static void
3027 kmr_ckpt_int_list_free(struct kmr_ckpt_list *list)
3028 {
3029  kmr_ckpt_list_free(list);
3030 }
3031 
3032 /* Add an integer to the tail of the list.
3033  If size of list is over max count, it deletes the oldest value. */
3034 static void
3035 kmr_ckpt_int_list_add(struct kmr_ckpt_list *list, long val)
3036 {
3037  kmr_ckpt_list_add(list, &val);
3038 }
3039 
3040 /* Delete the specified integer from the list.
3041  If it successes to delete it returns the value, otherwise it returns -1.
3042  It searches from the head. */
3043 static long
3044 kmr_ckpt_int_list_del(struct kmr_ckpt_list *list, long val)
3045 {
3046  long *v = (long *)kmr_ckpt_list_del(list, &val);
3047  if (v != 0) {
3048  return *v;
3049  } else {
3050  return -1;
3051  }
3052 }
3053 
3054 /* It searchs the specified integer from the head of the list.
3055  If the value is found it returns the value, otherwise it returns -1. */
3056 static long
3057 kmr_ckpt_int_list_search(struct kmr_ckpt_list *list, long val)
3058 {
3059  long *v = (long *)kmr_ckpt_list_search(list, &val);
3060  if (v != 0) {
3061  return *v;
3062  } else {
3063  return -1;
3064  }
3065 }
3066 
3067 /* It searchs the specified integer from the tail of the list.
3068  If the value is found it returns the value, otherwise it returns -1. */
3069 static long
3070 kmr_ckpt_int_list_rsearch(struct kmr_ckpt_list *list, long val)
3071 {
3072  long *v = (long *)kmr_ckpt_list_rsearch(list, &val);
3073  if (v != 0) {
3074  return *v;
3075  } else {
3076  return -1;
3077  }
3078 }
3079 
3080 #if 0
3081 /* Test integer list.
3082  Before call this function, set KMR_CKPT_LIST_MAX as 2. */
3083 static void test_kmr_ckpt_int_list()
3084 {
3085  struct kmr_ckpt_list list;
3086  kmr_ckpt_int_list_init(&list);
3087  long v = kmr_ckpt_int_list_del(&list, 1);
3088  assert(v == -1);
3089  assert(list.size == 0);
3090  v = kmr_ckpt_int_list_search(&list, 1);
3091  assert(v == -1);
3092  assert(list.size == 0);
3093  v = kmr_ckpt_int_list_rsearch(&list, 1);
3094  assert(v == -1);
3095  assert(list.size == 0);
3096  kmr_ckpt_int_list_add(&list, 10);
3097  assert(list.size == 1);
3098  kmr_ckpt_int_list_add(&list, 20);
3099  assert(list.size == 2);
3100  kmr_ckpt_int_list_add(&list, 30);
3101  assert(list.size == 2);
3102  v = kmr_ckpt_int_list_search(&list, 10);
3103  assert(v == -1);
3104  v = kmr_ckpt_int_list_rsearch(&list, 10);
3105  assert(v == -1);
3106  v = kmr_ckpt_int_list_search(&list, 20);
3107  assert(v == 20);
3108  v = kmr_ckpt_int_list_rsearch(&list, 20);
3109  assert(v == 20);
3110  v = kmr_ckpt_int_list_search(&list, 30);
3111  assert(v == 30);
3112  v = kmr_ckpt_int_list_rsearch(&list, 30);
3113  assert(v == 30);
3114  v = kmr_ckpt_int_list_del(&list, 1);
3115  assert(v == -1);
3116  assert(list.size == 2);
3117  v = kmr_ckpt_int_list_del(&list, 20);
3118  assert(v == 20);
3119  assert(list.size == 1);
3120  v = kmr_ckpt_int_list_del(&list, 30);
3121  assert(v == 30);
3122  assert(list.head == 0);
3123  assert(list.tail == 0);
3124  kmr_ckpt_int_list_free(&list);
3125  fprintf(stderr, "interger list test done.\n");
3126 }
3127 #endif
3128 
3129 /* allocator for operation item */
3130 static void *
3131 kmr_ckpt_opr_list_alocfn(void *val)
3132 {
3133  struct kmr_ckpt_operation *v = kmr_malloc(sizeof(struct kmr_ckpt_operation));
3134  *v = *(struct kmr_ckpt_operation *)val;
3135  return v;
3136 }
3137 
3138 /* deallocator for operation item */
3139 static void
3140 kmr_ckpt_opr_list_freefn(void *val)
3141 {
3142  kmr_free(val, sizeof(struct kmr_ckpt_operation));
3143 }
3144 
3145 /* comparetor for operation item */
3146 static int
3147 kmr_ckpt_opr_list_compfn(void *v1, void *v2)
3148 {
3149  struct kmr_ckpt_operation _v1 = *(struct kmr_ckpt_operation *)v1;
3150  struct kmr_ckpt_operation _v2 = *(struct kmr_ckpt_operation *)v2;
3151  if ( _v1.op_seqno > _v2.op_seqno ) {
3152  return 1;
3153  } else if ( _v1.op_seqno < _v2.op_seqno ) {
3154  return -1;
3155  } else {
3156  return 0;
3157  }
3158 }
3159 
3160 /* Initialize operation list */
3161 static void
3162 kmr_ckpt_opr_list_init(struct kmr_ckpt_list *list)
3163 {
3164  kmr_ckpt_list_init(list, kmr_ckpt_opr_list_alocfn,
3165  kmr_ckpt_opr_list_freefn, kmr_ckpt_opr_list_compfn);
3166 }
3167 
3168 /* Clear all data in the list */
3169 static void
3170 kmr_ckpt_opr_list_free(struct kmr_ckpt_list *list)
3171 {
3172  kmr_ckpt_list_free(list);
3173 }
3174 
3175 /* Add an operation to the tail of the list.
3176  If size of list is over max count, it deletes the oldest value. */
3177 static void
3178 kmr_ckpt_opr_list_add(struct kmr_ckpt_list *list, struct kmr_ckpt_operation op)
3179 {
3180  kmr_ckpt_list_add(list, &op);
3181 }
3182 
3183 /* Initialize kvs chains. */
3184 static void
3185 kmr_ckpt_kvs_chains_init(struct kmr_ckpt_kvs_chains *chains)
3186 {
3187  chains->chainlst = 0;
3188  chains->chainlst_size = 0;
3189 }
3190 
3191 /* Free kvs chains */
3192 static void
3193 kmr_ckpt_kvs_chains_free(struct kmr_ckpt_kvs_chains *chains)
3194 {
3195  for (int i = 0; i < chains->chainlst_size; i++) {
3196  struct kmr_ckpt_list *list = &chains->chainlst[i];
3197  kmr_ckpt_opr_list_free(list);
3198  }
3199  kmr_ckpt_kvs_chains_init(chains);
3200 }
3201 
3202 /* Create a new chain. */
3203 static void
3204 kmr_ckpt_kvs_chains_new_chain(struct kmr_ckpt_kvs_chains *chains,
3205  struct kmr_ckpt_operation op)
3206 {
3207  int idx = chains->chainlst_size;
3208  chains->chainlst_size += 1;
3209  chains->chainlst = (struct kmr_ckpt_list *)
3210  kmr_realloc(chains->chainlst,
3211  sizeof(struct kmr_ckpt_list) * (size_t)chains->chainlst_size);
3212  struct kmr_ckpt_list *list = &chains->chainlst[idx];
3213  kmr_ckpt_opr_list_init(list);
3214  kmr_ckpt_opr_list_add(list, op);
3215 }
3216 
3217 /* Connect an operation to an existing chain. */
3218 static void
3219 kmr_ckpt_kvs_chains_connect(struct kmr_ckpt_kvs_chains *chains,
3220  struct kmr_ckpt_operation op)
3221 {
3222  struct kmr_ckpt_list *list = kmr_ckpt_kvs_chains_find(chains, op.kvi_id);
3223  if (list != 0) {
3224  kmr_ckpt_opr_list_add(list, op);
3225  } else {
3226  kmr_ckpt_kvs_chains_new_chain(chains, op);
3227  }
3228 }
3229 
3230 /* Find a chain that contains KVS_ID.
3231  If all chains are closed (tha last operation's kvo is not KMR_CKPT_DUMMY_ID),
3232  it returns null. */
3233 static struct kmr_ckpt_list *
3234 kmr_ckpt_kvs_chains_find(struct kmr_ckpt_kvs_chains *chains, long kvo_id)
3235 {
3236  for (int i = 0; i < chains->chainlst_size; i++) {
3237  struct kmr_ckpt_list *list = &chains->chainlst[i];
3238  struct kmr_ckpt_operation *last_op =
3239  (struct kmr_ckpt_operation *)list->tail->val;
3240  if (last_op->kvo_id == KMR_CKPT_DUMMY_ID) {
3241  /* chain is closed */
3242  continue;
3243  }
3244  struct kmr_ckpt_list_item *item;
3245  for (item = list->tail; item != 0; item = item->prev) {
3246  struct kmr_ckpt_operation *op =
3247  (struct kmr_ckpt_operation *)item->val;
3248  if (op->kvo_id == kvo_id) {
3249  return list;
3250  }
3251  }
3252  }
3253  return 0;
3254 }
3255 
3256 #if 0
3257 /* Test KVS chains. */
3258 static void test_kmr_ckpt_kvs_chains()
3259 {
3260  struct kmr_ckpt_kvs_chains chains;
3261  kmr_ckpt_kvs_chains_init(&chains);
3262  struct kmr_ckpt_operation op1 = { .op_seqno = 1,
3263  .kvi_id = KMR_CKPT_DUMMY_ID,
3264  .kvo_id = 1 };
3265  kmr_ckpt_kvs_chains_new_chain(&chains, op1);
3266  assert(chains.chainlst_size == 1);
3267  struct kmr_ckpt_operation op2 = { .op_seqno = 2,
3268  .kvi_id = 1,
3269  .kvo_id = 2 };
3270  kmr_ckpt_kvs_chains_connect(&chains, op2);
3271  assert(chains.chainlst_size == 1);
3272  assert(chains.chainlst[0].size == 2);
3273  struct kmr_ckpt_operation op3 = { .op_seqno = 3,
3274  .kvi_id = KMR_CKPT_DUMMY_ID,
3275  .kvo_id = 3 };
3276  kmr_ckpt_kvs_chains_new_chain(&chains, op3);
3277  assert(chains.chainlst_size == 2);
3278  kmr_ckpt_kvs_chains_free(&chains);
3279  fprintf(stderr, "kvs chains test done.\n");
3280 }
3281 #endif
3282 
3283 
3284 /*
3285 Copyright (C) 2012-2016 RIKEN AICS
3286 This library is distributed WITHOUT ANY WARRANTY. This library can be
3287 redistributed and/or modified under the terms of the BSD 2-Clause License.
3288 */
Key-Value Stream (abstract).
Definition: kmr.h:587
void kmr_ckpt_save_kvo_each_add(KMR *mr, KMR_KVS *kvo, long ikv_index)
It adds new key-value pairs of the output KVS to the checkpoint data file.
Definition: kmrckpt.c:2718
Utilities Private Part (do not include from applications).
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
void kmr_ckpt_remove_ckpt(KMR_KVS *kvs)
It removes checkpoint data file.
Definition: kmrckpt.c:2612
#define KMR_ALIGN(X)
Rounds up a given size to the alignment restriction (currently eight bytes).
Definition: kmrimpl.h:75
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
void kmr_ckpt_lock_start(KMR *mr)
Define the start position of code region that is referred when restart.
Definition: kmrckpt.c:1934
int kmr_ckpt_enable_ckpt(KMR *mr, int lock_id)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
Definition: kmrckpt.c:2515
int kmr_ckpt_disable_ckpt(KMR *mr)
It temporally disables checkpoint/restart.
Definition: kmrckpt.c:2494
#define kmr_malloc(Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:177
void kmr_ckpt_save_kvo_block_fin(KMR *mr, KMR_KVS *kvo)
It finalizes saving block of key-value pairs of the output KVS to the checkpoint data file...
Definition: kmrckpt.c:2685
void kmr_ckpt_save_kvo_whole(KMR *mr, KMR_KVS *kvo)
It saves all key-value pairs in the output KVS to a checkpoint data file.
Definition: kmrckpt.c:2638
Definition: kmr.h:348
void kmr_ckpt_free_context(KMR *mr)
Free checkpoint context.
Definition: kmrckpt.c:162
KMR Context.
Definition: kmr.h:222
void kmr_ckpt_save_kvo_each_fin(KMR *mr, KMR_KVS *kvo)
It finalizes saving indexed key-value pairs of the output KVS to the checkpoint data file...
Definition: kmrckpt.c:2733
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:325
void kmr_ckpt_save_kvo_block_add(KMR *mr, KMR_KVS *kvo, long nkvi)
It adds a new block of key-value pairs of the output KVS to the checkpoint data file.
Definition: kmrckpt.c:2670
long kmr_ckpt_first_unprocessed_kv(KMR *mr)
It returns the index of the first unprocessed key-value in the input KVS.
Definition: kmrckpt.c:2535
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
void kmr_ckpt_save_kvo_each_init(KMR *mr, KMR_KVS *kvo)
It initializes saving indexed key-value pairs of the output KVS to a checkpoint data file...
Definition: kmrckpt.c:2703
void kmr_ckpt_progress_fin(KMR *mr)
It finalizes the progress of MapReduce checkpointing.
Definition: kmrckpt.c:2845
void kmr_ckpt_lock_finish(KMR *mr)
Define the end position of code region that is referred when restart.
Definition: kmrckpt.c:1945
void kmr_ckpt_create_context(KMR *mr)
Initialize checkpoint context.
Definition: kmrckpt.c:119
#define kmr_realloc(P, Z)
Allocates memory, or aborts when failed.
Definition: kmrimpl.h:181
KMR Interface.
Checkpoint/Restart Support.
int kmr_ckpt_progress_init(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
It initializes a progress of MapReduce checkpointing.
Definition: kmrckpt.c:2753
int kmr_ckpt_enabled(KMR *mr)
Check if checkpoint/restart is enabled.
Definition: kmrckpt.c:2478
static struct kmr_kv_box kmr_pick_kv(struct kmr_kvs_entry *e, KMR_KVS *kvs)
Returns a handle to a key-value entry – a reverse of kmr_poke_kv().
Definition: kmrimpl.h:551
Definition: kmrckpt.h:143
void kmr_ckpt_save_kvo_block_init(KMR *mr, KMR_KVS *kvo)
It initializes saving blocks of key-value pairs of the output KVS to a checkpoint data file...
Definition: kmrckpt.c:2654
void kmr_ckpt_restore_ckpt(KMR_KVS *kvs)
It restores checkpoint data to kvs.
Definition: kmrckpt.c:2557