44 #define DEF_NUM_ITERATION 10 45 #define DEF_NUM_POINTS 10000 46 #define DEF_NUM_MEANS 100 48 #define DEF_GRID_SIZE 1000 50 #define KMEANS_CKPT_FILE "./kmeans_ckpt" 51 #define KMEANS_CKPT_FILE_OLD "./kmeans_ckpt.bk" 53 struct kmr_option kmr_inspect = { .inspect = 1 };
54 struct kmr_option kmr_ckpt = { .take_ckpt = 1 };
84 static unsigned long int _rand_next = 1;
86 static int _rand(
void)
88 _rand_next = _rand_next * 1103515245 + 12345;
89 return (
unsigned int)(_rand_next/65536) % 32768;
92 static void _srand(
unsigned int seed)
99 measure_time(
struct timeval *tv)
101 MPI_Barrier(MPI_COMM_WORLD);
102 if (gettimeofday(tv, NULL) == -1) {
103 perror(
"gettimeofday");
110 calc_time_diff(
struct timeval *tv_s,
struct timeval *tv_e)
112 return ((
double)tv_e->tv_sec - (
double)tv_s->tv_sec)
113 + ((double)tv_e->tv_usec - (
double)tv_s->tv_usec) /1000000.0;
120 int ret = access(KMEANS_CKPT_FILE, R_OK);
134 while ((c = getopt(argc, argv,
"i:d:c:p:s:")) != EOF) {
137 kmeans->n_iteration = atoi(optarg);
140 kmeans->dim = atoi(optarg);
143 kmeans->n_means = atoi(optarg);
146 kmeans->n_points = atoi(optarg);
149 kmeans->grid_size = atoi(optarg);
152 printf(
"Usage: %s -i <num iteration> -d <vector dimension> " 153 "-c <num clusters> -p <num points> -s <max value>\n",
155 MPI_Abort(MPI_COMM_WORLD, 1);
159 if (kmeans->n_iteration <= 0 || kmeans->dim <= 0 || kmeans->n_means <= 0
160 || kmeans->n_points <= 0 || kmeans->grid_size <= 0) {
161 printf(
"Illegal argument value. All values must be numeric and " 163 MPI_Abort(MPI_COMM_WORLD, 1);
170 const char *tmp_file = KMEANS_CKPT_FILE
".tmp";
171 FILE *fp = fopen(tmp_file,
"w+");
173 fprintf(stderr,
"Failed to open kmeans state file.\n");
177 size_t ret = fwrite(&size,
sizeof(
size_t), 1, fp);
179 fprintf(stderr,
"Failed to write size of kmeans data.\n");
182 ret = fwrite(kmeans, size, 1, fp);
184 fprintf(stderr,
"Failed to write kmeans data.\n");
187 size =
sizeof(int) * (
size_t)kmeans->n_means * (size_t)kmeans->dim;
188 ret = fwrite(&size,
sizeof(
size_t), 1, fp);
190 fprintf(stderr,
"Failed to write size of means array.\n");
193 ret = fwrite(kmeans->means, size, 1, fp);
195 fprintf(stderr,
"Failed to write means array.\n");
202 rename(KMEANS_CKPT_FILE, KMEANS_CKPT_FILE_OLD);
203 rename(tmp_file, KMEANS_CKPT_FILE);
210 FILE *fp = fopen(KMEANS_CKPT_FILE,
"r");
212 fprintf(stderr,
"Failed to open kmeans state file.\n");
216 size_t ret = fread(&size,
sizeof(
size_t), 1, fp);
218 fprintf(stderr,
"Failed to read size of kmeans data.\n");
221 ret = fread(kmeans, size, 1, fp);
223 fprintf(stderr,
"Failed to read kmeans data.\n");
226 kmeans->points = NULL;
227 ret = fread(&size,
sizeof(
size_t), 1, fp);
229 fprintf(stderr,
"Failed to read size of means array.\n");
232 if (size != (
size_t)kmeans->n_means * (
size_t)kmeans->dim *
sizeof(
int)) {
233 fprintf(stderr,
"The size of means array is mismatch.\n");
236 kmeans->means = (
int*)malloc(size);
237 ret = fread(kmeans->means, size, 1, fp);
239 fprintf(stderr,
"Failed to read kmeans data.\n");
246 delete_kmeans_state()
248 remove(KMEANS_CKPT_FILE_OLD);
249 remove(KMEANS_CKPT_FILE);
254 generate_randoms(
int *pts,
int size,
int max_val)
257 for (i = 0; i < size; i++)
258 pts[i] = (_rand() % max_val);
270 int *points = kmeans->points;
272 for (i = 0; i < kmeans->n_points * kmeans->dim; i += kmeans->dim) {
273 struct kmr_kv_box nkv = { .klen = (int)
sizeof(
long),
274 .vlen = kmeans->dim * (int)
sizeof(
int),
276 .v.p = (
void *)&points[i] };
286 calc_sq_dist(
int *v1,
int *v2,
int dim)
289 unsigned int sum = 0;
290 for (i = 0; i < dim; i++)
291 sum += (
unsigned int)((v1[i] - v2[i]) * (v1[i] - v2[i]));
306 int dim = kmeans->dim;
307 int *means = kmeans->means;
308 int n_means = kmeans->n_means;
309 int *point = (
int *)kv.v.p;
311 unsigned int min_dist = calc_sq_dist(point, &means[0], dim);
313 for (i = 1; i < n_means; i++) {
314 unsigned int dist = calc_sq_dist(point, &means[i * dim], dim);
315 if (dist < min_dist) {
320 struct kmr_kv_box nkv = { .klen = (int)
sizeof(
long),
321 .vlen = dim * (int)
sizeof(
int),
323 .v.p = (
void *)point };
334 update_cluster(
const struct kmr_kv_box kv[],
const long n,
338 int cid = (int)kv[0].k.i;
340 int dim = kmeans->dim;
344 for (i = 0; i < dim; i++)
346 for (i = 0; i < n; i++) {
347 int *point = (
int *)kv[i].v.p;
348 for (j = 0; j < dim; j++) {
352 for (i = 0; i < dim; i++)
353 average[i] = (
int)(sum[i] / n);
355 struct kmr_kv_box nkv = { .klen =
sizeof(long),
356 .vlen = dim * (
int)
sizeof(int),
358 .v.p = (
void *)average };
372 int cid = (int)kv.k.i;
373 int *center = (
int *)kv.v.p;
375 int dim = kmeans->dim;
376 int *target = &kmeans->means[cid * dim];
378 for (i = 0; i < dim; i++)
379 target[i] = center[i];
385 print_means(
int *means,
int size,
int dim)
388 for (i = 0; i < size; i++) {
389 int *mean = &means[i * dim];
390 fprintf(stderr,
"( ");
391 for (j = 0; j < dim; j++)
392 fprintf(stderr,
"%d ", mean[j]);
393 fprintf(stderr,
")\n");
398 print_progress(
const char *msg,
int rank,
int n_itr)
400 MPI_Barrier(MPI_COMM_WORLD);
402 fprintf(stderr,
"ITR[%d]: %s\n", n_itr, msg);
404 MPI_Barrier(MPI_COMM_WORLD);
412 if (kmeans->nprocs == 2 &&
413 kmeans->dim == DEF_DIM &&
414 kmeans->grid_size == DEF_GRID_SIZE &&
415 kmeans->n_points == 10 &&
416 kmeans->n_means == 4 &&
417 kmeans->n_iteration >= DEF_NUM_ITERATION) {
420 "Case of 2 nodes, 10 points and 4 centers.\n" 421 "Check the answer...");
422 count = 4 * kmeans->dim;
423 answer = (
int*)malloc((
size_t)count *
sizeof(int));
436 }
else if (kmeans->nprocs == 4 &&
437 kmeans->dim == DEF_DIM &&
438 kmeans->grid_size == DEF_GRID_SIZE &&
439 kmeans->n_points == 10 &&
440 kmeans->n_means == 2 &&
441 kmeans->n_iteration >= DEF_NUM_ITERATION) {
444 "Case of 4 nodes, 10 points and 2 centers.\n" 445 "Check the answer...");
446 count = 2 * kmeans->dim;
447 answer = (
int*)malloc((
size_t)count *
sizeof(int));
454 }
else if (kmeans->nprocs == 4 &&
455 kmeans->dim == DEF_DIM &&
456 kmeans->grid_size == DEF_GRID_SIZE &&
457 kmeans->n_points == 10 &&
458 kmeans->n_means == 4 &&
459 kmeans->n_iteration >= DEF_NUM_ITERATION) {
462 "Case of 4 nodes, 10 points and 4 centers.\n" 463 "Check the answer...");
464 count = 4 * kmeans->dim;
465 answer = (
int*)malloc((
size_t)count *
sizeof(int));
478 }
else if (kmeans->nprocs == 8 &&
479 kmeans->dim == DEF_DIM &&
480 kmeans->grid_size == DEF_GRID_SIZE &&
481 kmeans->n_points == 10 &&
482 kmeans->n_means == 4 &&
483 kmeans->n_iteration >= DEF_NUM_ITERATION) {
486 "Case of 8 nodes, 10 points and 4 centers.\n" 487 "Check the answer...");
488 count = 4 * kmeans->dim;
489 answer = (
int*)malloc((
size_t)count *
sizeof(int));
506 for (
int i = 0; i < count; i++) {
507 if (kmeans->means[i] != answer[i]) {
513 printf(
"The answer is correct.\n");
515 printf(
"The answer is wrong.\n");
524 main(
int argc,
char **argv)
528 int p_iteration = -1;
530 MPI_Init(&argc, &argv);
531 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
532 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
536 MPI_Info_create(&info);
537 MPI_Info_set(info,
"ckpt_enable",
"1");
538 MPI_Info_set(info,
"ckpt_selective",
"1");
540 MPI_Info_free(&info);
543 _srand((
unsigned int)(rank + 1));
547 load_kmeans_state(&kmeans);
548 p_iteration = kmeans.c_iteration;
549 kmeans.c_iteration = 0;
550 printf(
"*** Restarted ***\n\n");
552 kmeans.nprocs = nprocs;
553 kmeans.n_iteration = DEF_NUM_ITERATION;
554 kmeans.c_iteration = 0;
555 kmeans.grid_size = DEF_GRID_SIZE;
556 kmeans.dim = DEF_DIM;
557 kmeans.n_points = DEF_NUM_POINTS;
558 kmeans.n_means = DEF_NUM_MEANS;
562 kmeans.means = (
int *) malloc((
size_t)kmeans.n_means * (size_t)kmeans.dim *
sizeof(
int));
563 generate_randoms(kmeans.means, kmeans.n_means * kmeans.dim,
567 printf(
"#### Configuration ###########################\n");
568 printf(
"Number of processes = %d\n", nprocs);
569 printf(
"Iteration = %d\n", kmeans.n_iteration);
570 printf(
"Dimension = %d\n", kmeans.dim);
571 printf(
"Number of clusters = %d\n", kmeans.n_means);
572 printf(
"Number of points = %d\n", kmeans.n_points);
573 printf(
"##############################################\n");
575 MPI_Bcast(&(kmeans.n_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
576 MPI_Bcast(&(kmeans.c_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
577 MPI_Bcast(&(kmeans.grid_size), 1, MPI_INT, 0, MPI_COMM_WORLD);
578 MPI_Bcast(&(kmeans.dim), 1, MPI_INT, 0, MPI_COMM_WORLD);
579 MPI_Bcast(&(kmeans.n_points), 1, MPI_INT, 0, MPI_COMM_WORLD);
580 MPI_Bcast(&(kmeans.n_means), 1, MPI_INT, 0, MPI_COMM_WORLD);
581 MPI_Bcast(&p_iteration, 1, MPI_INT, 0, MPI_COMM_WORLD);
583 kmeans.means = (
int *)malloc((
size_t)kmeans.n_means * (size_t)kmeans.dim *
sizeof(
int));
585 MPI_Bcast(kmeans.means, kmeans.n_means * kmeans.dim, MPI_INT,
587 if (is_restart() != 1) {
589 kmeans.points = (
int *)malloc((
size_t)kmeans.n_points * (size_t)kmeans.dim *
sizeof(
int));
590 generate_randoms(kmeans.points, kmeans.n_points * kmeans.dim,
600 kmr_map_once(kvs_points, (
void *)&kmeans, kmr_ckpt, 0, generate_points);
601 print_progress(
"map_onece done", rank, 0);
604 struct timeval tv_s, tv_e;
605 double time_diff, total_time = 0.0;
606 while (kmeans.c_iteration < kmeans.n_iteration) {
608 if (kmeans.c_iteration > p_iteration) {
609 if (measure_time(&tv_s) == -1) {
610 MPI_Abort(MPI_COMM_WORLD, 1);
616 kmr_map(kvs_points, kvs_c2p, (
void *)&kmeans, kmr_inspect,
618 print_progress(
"map done", rank, kmeans.c_iteration);
623 print_progress(
"shuffle done", rank, kmeans.c_iteration);
627 KMR_KV_INTEGER, KMR_KV_OPAQUE);
628 kmr_reduce(kvs_c2p_s, kvs_cluster, (
void *)&kmeans, kmr_noopt,
630 print_progress(
"reduce done", rank, kmeans.c_iteration);
632 if (mr->ckpt_enable && kmeans.c_iteration > (p_iteration + 1)
633 && kmeans.c_iteration == 4) {
635 fprintf(stderr,
"Aborted on rank 0 for testing purpose.\n");
636 MPI_Abort(MPI_COMM_WORLD, 1);
647 print_progress(
"replicate done", rank, kmeans.c_iteration);
650 kmr_map(kvs_all_clusters, NULL, (
void *)&kmeans, kmr_noopt,
654 if (kmeans.c_iteration > p_iteration) {
655 if (measure_time(&tv_e) == -1) {
656 MPI_Abort(MPI_COMM_WORLD, 1);
659 time_diff = calc_time_diff(&tv_s, &tv_e);
660 total_time += time_diff;
662 print_means(kmeans.means, kmeans.n_means, kmeans.dim);
663 if (save_kmeans_state(&kmeans)) {
664 MPI_Abort(MPI_COMM_WORLD, 1);
669 if (mr->ckpt_enable && kmeans.c_iteration > p_iteration
670 && kmeans.c_iteration == 1) {
672 fprintf(stderr,
"Aborted on rank 0 for testing purpose.\n");
673 MPI_Abort(MPI_COMM_WORLD, 1);
677 kmeans.c_iteration += 1;
680 printf(
"Total elapse time: %f\n", total_time);
681 check_answer(&kmeans);
682 delete_kmeans_state();
686 if (is_restart() != 1) {
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
#define kmr_init()
Sets up the environment.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).