KMR
testckpt3.c
1 /* testckpt3 (2015-03-14) */
2 
3 /* K-Means program for testing checkpoint restart in 'selective' mode.
4 
5  This program can be run like the following.
6 
7  $ mpirun -np 4 ./a.out -c 2 -p 10
8 
9  It will be aborted at the end of the second iteration of k-means
10  algorithm. There will be two types of checkpoint files.
11 
12  1. checkpoint files generated by KMR stored in 'ckptdir0000*'
13  directories.
14  2. a checkpoint file that stores k-means progress.
15  Its file name is 'kmeans_ckpt'.
16 
17  You can restart the program from the state stored in the above
18  checkpoint files. In this case, you don't need to specify
19  parameters. If you do, they will be ignored.
20 
21  $ mpirun -np 4 ./a.out
22 
23  The program will be completed after you run it three times.
24 
25  When run with the following parameters, this program will perform
26  answer checking.
27 
28  $ mpirun -np 2 ./a.out -c 4 -p 10
29  $ mpirun -np 4 ./a.out -c 2 -p 10
30  $ mpirun -np 4 ./a.out -c 4 -p 10
31  $ mpirun -np 8 ./a.out -c 4 -p 10
32 
33 */
34 
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <sys/time.h>
39 #include <limits.h>
40 #include <math.h>
41 #include <mpi.h>
42 #include "kmr.h"
43 
44 #define DEF_NUM_ITERATION 10
45 #define DEF_NUM_POINTS 10000
46 #define DEF_NUM_MEANS 100
47 #define DEF_DIM 3
48 #define DEF_GRID_SIZE 1000
49 
50 #define KMEANS_CKPT_FILE "./kmeans_ckpt"
51 #define KMEANS_CKPT_FILE_OLD "./kmeans_ckpt.bk"
52 
53 struct kmr_option kmr_inspect = { .inspect = 1 };
54 struct kmr_option kmr_ckpt = { .take_ckpt = 1 };
55 
56 typedef struct {
57  // number of processes
58  int nprocs;
59  // number of k-means iteration
60  int n_iteration;
61  // current iteration number
62  int c_iteration;
63  // size of grid
64  int grid_size;
65  // dimention of points
66  int dim;
67  // number of points
68  int n_points;
69  // points
70  int *points;
71  // number of means
72  int n_means;
73  // means
74  int *means;
75 } kmeans_t;
76 
77 
78 /*
79  * Ths following code for random number is copied from C standard
80  * chapter 7.20.2.
81  *
82  * http://www.open-std.org/jtc1/sc22/wg14/www/docs/n1256.pdf
83  */
84 static unsigned long int _rand_next = 1;
85 
86 static int _rand(void)
87 {
88  _rand_next = _rand_next * 1103515245 + 12345;
89  return (unsigned int)(_rand_next/65536) % 32768;
90 }
91 
92 static void _srand(unsigned int seed)
93 {
94  _rand_next = seed;
95 }
96 
97 
98 static int
99 measure_time(struct timeval *tv)
100 {
101  MPI_Barrier(MPI_COMM_WORLD);
102  if (gettimeofday(tv, NULL) == -1) {
103  perror("gettimeofday");
104  return -1;
105  }
106  return 0;
107 }
108 
109 static double
110 calc_time_diff(struct timeval *tv_s, struct timeval *tv_e)
111 {
112  return ((double)tv_e->tv_sec - (double)tv_s->tv_sec)
113  + ((double)tv_e->tv_usec - (double)tv_s->tv_usec) /1000000.0;
114 }
115 
116 static _Bool
117 is_restart()
118 {
119  _Bool result = 0;
120  int ret = access(KMEANS_CKPT_FILE, R_OK);
121  if (ret == 0) {
122  result = 1;
123  }
124  return result;
125 }
126 
127 /* Parse commandline arguments */
128 static void
129 parse_args(int argc, char **argv, kmeans_t *kmeans)
130 {
131  int c;
132  extern char *optarg;
133 
134  while ((c = getopt(argc, argv, "i:d:c:p:s:")) != EOF) {
135  switch (c) {
136  case 'i':
137  kmeans->n_iteration = atoi(optarg);
138  break;
139  case 'd':
140  kmeans->dim = atoi(optarg);
141  break;
142  case 'c':
143  kmeans->n_means = atoi(optarg);
144  break;
145  case 'p':
146  kmeans->n_points = atoi(optarg);
147  break;
148  case 's':
149  kmeans->grid_size = atoi(optarg);
150  break;
151  case '?':
152  printf("Usage: %s -i <num iteration> -d <vector dimension> "
153  "-c <num clusters> -p <num points> -s <max value>\n",
154  argv[0]);
155  MPI_Abort(MPI_COMM_WORLD, 1);
156  }
157  }
158 
159  if (kmeans->n_iteration <= 0 || kmeans->dim <= 0 || kmeans->n_means <= 0
160  || kmeans->n_points <= 0 || kmeans->grid_size <= 0) {
161  printf("Illegal argument value. All values must be numeric and "
162  "greater than 0\n");
163  MPI_Abort(MPI_COMM_WORLD, 1);
164  }
165 }
166 
167 static int
168 save_kmeans_state(kmeans_t *kmeans)
169 {
170  const char *tmp_file = KMEANS_CKPT_FILE ".tmp";
171  FILE *fp = fopen(tmp_file, "w+");
172  if (fp == NULL) {
173  fprintf(stderr, "Failed to open kmeans state file.\n");
174  return 1;
175  }
176  size_t size = sizeof(kmeans_t);
177  size_t ret = fwrite(&size, sizeof(size_t), 1, fp);
178  if (ret != 1) {
179  fprintf(stderr, "Failed to write size of kmeans data.\n");
180  return 1;
181  }
182  ret = fwrite(kmeans, size, 1, fp);
183  if (ret != 1) {
184  fprintf(stderr, "Failed to write kmeans data.\n");
185  return 1;
186  }
187  size = sizeof(int) * (size_t)kmeans->n_means * (size_t)kmeans->dim;
188  ret = fwrite(&size, sizeof(size_t), 1, fp);
189  if (ret != 1) {
190  fprintf(stderr, "Failed to write size of means array.\n");
191  return 1;
192  }
193  ret = fwrite(kmeans->means, size, 1, fp);
194  if (ret != 1) {
195  fprintf(stderr, "Failed to write means array.\n");
196  return 1;
197  }
198  fflush(fp);
199  fsync(fileno(fp));
200  fclose(fp);
201 
202  rename(KMEANS_CKPT_FILE, KMEANS_CKPT_FILE_OLD);
203  rename(tmp_file, KMEANS_CKPT_FILE);
204  return 0;
205 }
206 
207 static int
208 load_kmeans_state(kmeans_t *kmeans)
209 {
210  FILE *fp = fopen(KMEANS_CKPT_FILE, "r");
211  if (fp == NULL) {
212  fprintf(stderr, "Failed to open kmeans state file.\n");
213  return 1;
214  }
215  size_t size;
216  size_t ret = fread(&size, sizeof(size_t), 1, fp);
217  if (ret != 1) {
218  fprintf(stderr, "Failed to read size of kmeans data.\n");
219  return 1;
220  }
221  ret = fread(kmeans, size, 1, fp);
222  if (ret != 1) {
223  fprintf(stderr, "Failed to read kmeans data.\n");
224  return 1;
225  }
226  kmeans->points = NULL;
227  ret = fread(&size, sizeof(size_t), 1, fp);
228  if (ret != 1) {
229  fprintf(stderr, "Failed to read size of means array.\n");
230  return 1;
231  }
232  if (size != (size_t)kmeans->n_means * (size_t)kmeans->dim * sizeof(int)) {
233  fprintf(stderr, "The size of means array is mismatch.\n");
234  return 1;
235  }
236  kmeans->means = (int*)malloc(size);
237  ret = fread(kmeans->means, size, 1, fp);
238  if (ret != 1) {
239  fprintf(stderr, "Failed to read kmeans data.\n");
240  return 1;
241  }
242  return 0;
243 }
244 
245 static void
246 delete_kmeans_state()
247 {
248  remove(KMEANS_CKPT_FILE_OLD);
249  remove(KMEANS_CKPT_FILE);
250 }
251 
252 /* Generate the points. */
253 static void
254 generate_randoms(int *pts, int size, int max_val)
255 {
256  int i;
257  for (i = 0; i < size; i++)
258  pts[i] = (_rand() % max_val);
259 }
260 
261 /* KMR map function
262  It copies multi-dimension points to KVS.
263 */
264 static int
265 generate_points(const struct kmr_kv_box kv,
266  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
267 {
268  int i;
269  kmeans_t *kmeans = (kmeans_t *)p;
270  int *points = kmeans->points;
271 
272  for (i = 0; i < kmeans->n_points * kmeans->dim; i += kmeans->dim) {
273  struct kmr_kv_box nkv = { .klen = (int)sizeof(long),
274  .vlen = kmeans->dim * (int)sizeof(int),
275  .k.i = i,
276  .v.p = (void *)&points[i] };
277  kmr_add_kv(kvo, nkv);
278  }
279 
280  return MPI_SUCCESS;
281 }
282 
283 /* calculate squared distance
284  */
285 static unsigned int
286 calc_sq_dist(int *v1, int *v2, int dim)
287 {
288  int i;
289  unsigned int sum = 0;
290  for (i = 0; i < dim; i++)
291  sum += (unsigned int)((v1[i] - v2[i]) * (v1[i] - v2[i]));
292  return sum;
293 }
294 
295 /* KMR map function
296  It calculates cluster of a point stored in kv.
297  It emits a Key-Value whose key is cluster id (index of array
298  "kmeans.means") and value is the point.
299 */
300 static int
301 calc_cluster(const struct kmr_kv_box kv,
302  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
303 {
304  int i;
305  kmeans_t *kmeans = (kmeans_t *)p;
306  int dim = kmeans->dim;
307  int *means = kmeans->means;
308  int n_means = kmeans->n_means;
309  int *point = (int *)kv.v.p;
310  int min_idx = 0;
311  unsigned int min_dist = calc_sq_dist(point, &means[0], dim);
312 
313  for (i = 1; i < n_means; i++) {
314  unsigned int dist = calc_sq_dist(point, &means[i * dim], dim);
315  if (dist < min_dist) {
316  min_idx = i;
317  min_dist = dist;
318  }
319  }
320  struct kmr_kv_box nkv = { .klen = (int)sizeof(long),
321  .vlen = dim * (int)sizeof(int),
322  .k.i = min_idx,
323  .v.p = (void *)point };
324  kmr_add_kv(kvo, nkv);
325 
326  return MPI_SUCCESS;
327 }
328 
329 /* KMR reduce function
330  It calculates center of clusters.
331  It emits a Key-Value whose key is cluster id and value is the new center.
332 */
333 static int
334 update_cluster(const struct kmr_kv_box kv[], const long n,
335  const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
336 {
337  int i, j;
338  int cid = (int)kv[0].k.i;
339  kmeans_t *kmeans = (kmeans_t *)p;
340  int dim = kmeans->dim;
341  long sum[dim];
342  int average[dim];
343 
344  for (i = 0; i < dim; i++)
345  sum[i] = 0;
346  for (i = 0; i < n; i++) {
347  int *point = (int *)kv[i].v.p;
348  for (j = 0; j < dim; j++) {
349  sum[j] += point[j];
350  }
351  }
352  for (i = 0; i < dim; i++)
353  average[i] = (int)(sum[i] / n);
354 
355  struct kmr_kv_box nkv = { .klen = sizeof(long),
356  .vlen = dim * (int)sizeof(int),
357  .k.i = cid,
358  .v.p = (void *)average };
359  kmr_add_kv(kvo, nkv);
360 
361  return MPI_SUCCESS;
362 }
363 
364 /* KMR map function
365  It copies centers of clusters stored in kv to "kmeans.means" array.
366 */
367 static int
368 copy_center(const struct kmr_kv_box kv,
369  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
370 {
371  int i;
372  int cid = (int)kv.k.i;
373  int *center = (int *)kv.v.p;
374  kmeans_t *kmeans = (kmeans_t *)p;
375  int dim = kmeans->dim;
376  int *target = &kmeans->means[cid * dim];
377 
378  for (i = 0; i < dim; i++)
379  target[i] = center[i];
380 
381  return MPI_SUCCESS;
382 }
383 
384 static void
385 print_means(int *means, int size, int dim)
386 {
387  int i, j;
388  for (i = 0; i < size; i++) {
389  int *mean = &means[i * dim];
390  fprintf(stderr, "( ");
391  for (j = 0; j < dim; j++)
392  fprintf(stderr, "%d ", mean[j]);
393  fprintf(stderr, ")\n");
394  }
395 }
396 
397 static void
398 print_progress(const char *msg, int rank, int n_itr)
399 {
400  MPI_Barrier(MPI_COMM_WORLD);
401  if (rank == 0) {
402  fprintf(stderr, "ITR[%d]: %s\n", n_itr, msg);
403  }
404  MPI_Barrier(MPI_COMM_WORLD);
405 }
406 
407 static void
408 check_answer(kmeans_t *kmeans)
409 {
410  int count = 0;
411  int *answer = NULL;
412  if (kmeans->nprocs == 2 &&
413  kmeans->dim == DEF_DIM &&
414  kmeans->grid_size == DEF_GRID_SIZE &&
415  kmeans->n_points == 10 &&
416  kmeans->n_means == 4 &&
417  kmeans->n_iteration >= DEF_NUM_ITERATION) {
418  // 2 nodes, 10 points, 4 centers
419  printf("\n"
420  "Case of 2 nodes, 10 points and 4 centers.\n"
421  "Check the answer...");
422  count = 4 * kmeans->dim;
423  answer = (int*)malloc((size_t)count * sizeof(int));
424  answer[0] = 906;
425  answer[1] = 733;
426  answer[2] = 301;
427  answer[3] = 426;
428  answer[4] = 179;
429  answer[5] = 279;
430  answer[6] = 121;
431  answer[7] = 702;
432  answer[8] = 107;
433  answer[9] = 349;
434  answer[10] = 727;
435  answer[11] = 751;
436  } else if (kmeans->nprocs == 4 &&
437  kmeans->dim == DEF_DIM &&
438  kmeans->grid_size == DEF_GRID_SIZE &&
439  kmeans->n_points == 10 &&
440  kmeans->n_means == 2 &&
441  kmeans->n_iteration >= DEF_NUM_ITERATION) {
442  // 4 nodes, 10 points, 2 centers
443  printf("\n"
444  "Case of 4 nodes, 10 points and 2 centers.\n"
445  "Check the answer...");
446  count = 2 * kmeans->dim;
447  answer = (int*)malloc((size_t)count * sizeof(int));
448  answer[0] = 830;
449  answer[1] = 787;
450  answer[2] = 389;
451  answer[3] = 346;
452  answer[4] = 425;
453  answer[5] = 448;
454  } else if (kmeans->nprocs == 4 &&
455  kmeans->dim == DEF_DIM &&
456  kmeans->grid_size == DEF_GRID_SIZE &&
457  kmeans->n_points == 10 &&
458  kmeans->n_means == 4 &&
459  kmeans->n_iteration >= DEF_NUM_ITERATION) {
460  // 4 nodes, 10 points, 4 centers
461  printf("\n"
462  "Case of 4 nodes, 10 points and 4 centers.\n"
463  "Check the answer...");
464  count = 4 * kmeans->dim;
465  answer = (int*)malloc((size_t)count * sizeof(int));
466  answer[0] = 870;
467  answer[1] = 756;
468  answer[2] = 434;
469  answer[3] = 654;
470  answer[4] = 220;
471  answer[5] = 435;
472  answer[6] = 194;
473  answer[7] = 501;
474  answer[8] = 167;
475  answer[9] = 230;
476  answer[10] = 609;
477  answer[11] = 729;
478  } else if (kmeans->nprocs == 8 &&
479  kmeans->dim == DEF_DIM &&
480  kmeans->grid_size == DEF_GRID_SIZE &&
481  kmeans->n_points == 10 &&
482  kmeans->n_means == 4 &&
483  kmeans->n_iteration >= DEF_NUM_ITERATION) {
484  // 8 nodes, 10 points, 4 centers
485  printf("\n"
486  "Case of 8 nodes, 10 points and 4 centers.\n"
487  "Check the answer...");
488  count = 4 * kmeans->dim;
489  answer = (int*)malloc((size_t)count * sizeof(int));
490  answer[0] = 809;
491  answer[1] = 753;
492  answer[2] = 467;
493  answer[3] = 668;
494  answer[4] = 260;
495  answer[5] = 495;
496  answer[6] = 194;
497  answer[7] = 438;
498  answer[8] = 235;
499  answer[9] = 220;
500  answer[10] = 620;
501  answer[11] = 792;
502  }
503 
504  if (count != 0) {
505  _Bool ok = 1;
506  for (int i = 0; i < count; i++) {
507  if (kmeans->means[i] != answer[i]) {
508  ok = 0;
509  break;
510  }
511  }
512  if (ok) {
513  printf("The answer is correct.\n");
514  } else {
515  printf("The answer is wrong.\n");
516  }
517  free(answer);
518  }
519 }
520 
521 ////////////////////////////////////////////////////////////////////////////////
522 
523 int
524 main(int argc, char **argv)
525 {
526  kmeans_t kmeans;
527  int nprocs, rank;
528  int p_iteration = -1;
529 
530  MPI_Init(&argc, &argv);
531  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
532  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
533 
534  kmr_init();
535  MPI_Info info;
536  MPI_Info_create(&info);
537  MPI_Info_set(info, "ckpt_enable", "1");
538  MPI_Info_set(info, "ckpt_selective", "1");
539  KMR *mr = kmr_create_context(MPI_COMM_WORLD, info, 0);
540  MPI_Info_free(&info);
541 
542  // Initialize using MPI functions
543  _srand((unsigned int)(rank + 1));
544  //srand((rank + 1) * getpid());
545  if (rank == 0){
546  if (is_restart()) {
547  load_kmeans_state(&kmeans);
548  p_iteration = kmeans.c_iteration;
549  kmeans.c_iteration = 0;
550  printf("*** Restarted ***\n\n");
551  } else {
552  kmeans.nprocs = nprocs;
553  kmeans.n_iteration = DEF_NUM_ITERATION;
554  kmeans.c_iteration = 0;
555  kmeans.grid_size = DEF_GRID_SIZE;
556  kmeans.dim = DEF_DIM;
557  kmeans.n_points = DEF_NUM_POINTS;
558  kmeans.n_means = DEF_NUM_MEANS;
559 
560  parse_args(argc, argv, &kmeans);
561  // set initial centers randomly on rank 0
562  kmeans.means = (int *) malloc((size_t)kmeans.n_means * (size_t)kmeans.dim * sizeof(int));
563  generate_randoms(kmeans.means, kmeans.n_means * kmeans.dim,
564  kmeans.grid_size);
565  }
566 
567  printf("#### Configuration ###########################\n");
568  printf("Number of processes = %d\n", nprocs);
569  printf("Iteration = %d\n", kmeans.n_iteration);
570  printf("Dimension = %d\n", kmeans.dim);
571  printf("Number of clusters = %d\n", kmeans.n_means);
572  printf("Number of points = %d\n", kmeans.n_points);
573  printf("##############################################\n");
574  }
575  MPI_Bcast(&(kmeans.n_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
576  MPI_Bcast(&(kmeans.c_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
577  MPI_Bcast(&(kmeans.grid_size), 1, MPI_INT, 0, MPI_COMM_WORLD);
578  MPI_Bcast(&(kmeans.dim), 1, MPI_INT, 0, MPI_COMM_WORLD);
579  MPI_Bcast(&(kmeans.n_points), 1, MPI_INT, 0, MPI_COMM_WORLD);
580  MPI_Bcast(&(kmeans.n_means), 1, MPI_INT, 0, MPI_COMM_WORLD);
581  MPI_Bcast(&p_iteration, 1, MPI_INT, 0, MPI_COMM_WORLD);
582  if (rank != 0) {
583  kmeans.means = (int *)malloc((size_t)kmeans.n_means * (size_t)kmeans.dim * sizeof(int));
584  }
585  MPI_Bcast(kmeans.means, kmeans.n_means * kmeans.dim, MPI_INT,
586  0, MPI_COMM_WORLD);
587  if (is_restart() != 1) {
588  // set points randomly on each rank
589  kmeans.points = (int *)malloc((size_t)kmeans.n_points * (size_t)kmeans.dim * sizeof(int));
590  generate_randoms(kmeans.points, kmeans.n_points * kmeans.dim,
591  kmeans.grid_size);
592  }
593  KMR_KVS *kvs_points = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
594  // HINT ON SELECTIVE
595  //
596  // If this kvs (kvs_points) is not saved to a ckpt file,
597  // the result reported by restart process is not correct.
598  // The reason is that this kvs is referred in every iteration, and
599  // once restarted, whole kvs is lost if the ckpt file is not exist.
600  kmr_map_once(kvs_points, (void *)&kmeans, kmr_ckpt, 0, generate_points);
601  print_progress("map_onece done", rank, 0);
602 
603  // kernel //
604  struct timeval tv_s, tv_e;
605  double time_diff, total_time = 0.0;
606  while (kmeans.c_iteration < kmeans.n_iteration) {
607  // measure iteration start time
608  if (kmeans.c_iteration > p_iteration) {
609  if (measure_time(&tv_s) == -1) {
610  MPI_Abort(MPI_COMM_WORLD, 1);
611  }
612  }
613 
614  // call kmr_map to calculate cluster
615  KMR_KVS *kvs_c2p = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
616  kmr_map(kvs_points, kvs_c2p, (void *)&kmeans, kmr_inspect,
617  calc_cluster);
618  print_progress("map done", rank, kmeans.c_iteration);
619 
620  // call kmr_shuffle to gather points which belong to the same cluster
621  KMR_KVS *kvs_c2p_s = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
622  kmr_shuffle(kvs_c2p, kvs_c2p_s, kmr_noopt);
623  print_progress("shuffle done", rank, kmeans.c_iteration);
624 
625  // call kmr_reduce to update cluster center
626  KMR_KVS *kvs_cluster = kmr_create_kvs(mr,
627  KMR_KV_INTEGER, KMR_KV_OPAQUE);
628  kmr_reduce(kvs_c2p_s, kvs_cluster, (void *)&kmeans, kmr_noopt,
629  update_cluster);
630  print_progress("reduce done", rank, kmeans.c_iteration);
631 
632  if (mr->ckpt_enable && kmeans.c_iteration > (p_iteration + 1)
633  && kmeans.c_iteration == 4) {
634  if (rank == 0) {
635  fprintf(stderr, "Aborted on rank 0 for testing purpose.\n");
636  MPI_Abort(MPI_COMM_WORLD, 1);
637  }
638  }
639 
640  // cal kmr_replicate to share new cluster centers
641  KMR_KVS *kvs_all_clusters =
642  kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
643  // HINT ON SELECTIVE
644  //
645  // A ckpt file is taken for recording progress.
646  kmr_replicate(kvs_cluster, kvs_all_clusters, kmr_ckpt);
647  print_progress("replicate done", rank, kmeans.c_iteration);
648 
649  // call kmr_map to copy new cluster centers to kmeans.means array.
650  kmr_map(kvs_all_clusters, NULL, (void *)&kmeans, kmr_noopt,
651  copy_center);
652 
653  // measure iteration end time
654  if (kmeans.c_iteration > p_iteration) {
655  if (measure_time(&tv_e) == -1) {
656  MPI_Abort(MPI_COMM_WORLD, 1);
657  }
658  if (rank == 0) {
659  time_diff = calc_time_diff(&tv_s, &tv_e);
660  total_time += time_diff;
661  //printf("Iteration[%2d]: Elapse time: %f\n", itr, time_diff);
662  print_means(kmeans.means, kmeans.n_means, kmeans.dim);
663  if (save_kmeans_state(&kmeans)) {
664  MPI_Abort(MPI_COMM_WORLD, 1);
665  }
666  }
667  }
668 
669  if (mr->ckpt_enable && kmeans.c_iteration > p_iteration
670  && kmeans.c_iteration == 1) {
671  if (rank == 0) {
672  fprintf(stderr, "Aborted on rank 0 for testing purpose.\n");
673  MPI_Abort(MPI_COMM_WORLD, 1);
674  }
675  }
676 
677  kmeans.c_iteration += 1;
678  }
679  if (rank == 0) {
680  printf("Total elapse time: %f\n", total_time);
681  check_answer(&kmeans);
682  delete_kmeans_state();
683  }
684 
685  free(kmeans.means);
686  if (is_restart() != 1) {
687  free(kmeans.points);
688  }
689  kmr_free_kvs(kvs_points);
690  kmr_free_context(mr);
691  kmr_fin();
692 
693  MPI_Finalize();
694  return 0;
695 }
Key-Value Stream (abstract).
Definition: kmr.h:587
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1402
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
Definition: kmrshell.c:257
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147