44 #define DEF_NUM_ITERATION 10 45 #define DEF_NUM_POINTS 10000 46 #define DEF_NUM_MEANS 100 48 #define DEF_GRID_SIZE 1000 50 #define KMEANS_CKPT_FILE "./kmeans_ckpt" 51 #define KMEANS_CKPT_FILE_OLD "./kmeans_ckpt.bk" 53 struct kmr_option kmr_inspect = { .inspect = 1 };
83 static unsigned long int _rand_next = 1;
85 static int _rand(
void)
87 _rand_next = _rand_next * 1103515245 + 12345;
88 return (
unsigned int)(_rand_next/65536) % 32768;
91 static void _srand(
unsigned int seed)
98 measure_time(
struct timeval *tv)
100 MPI_Barrier(MPI_COMM_WORLD);
101 if (gettimeofday(tv, NULL) == -1) {
102 perror(
"gettimeofday");
109 calc_time_diff(
struct timeval *tv_s,
struct timeval *tv_e)
111 return ((
double)tv_e->tv_sec - (
double)tv_s->tv_sec)
112 + ((double)tv_e->tv_usec - (
double)tv_s->tv_usec) /1000000.0;
119 int ret = access(KMEANS_CKPT_FILE, R_OK);
133 while ((c = getopt(argc, argv,
"i:d:c:p:s:")) != EOF) {
136 kmeans->n_iteration = atoi(optarg);
139 kmeans->dim = atoi(optarg);
142 kmeans->n_means = atoi(optarg);
145 kmeans->n_points = atoi(optarg);
148 kmeans->grid_size = atoi(optarg);
151 printf(
"Usage: %s -i <num iteration> -d <vector dimension> " 152 "-c <num clusters> -p <num points> -s <max value>\n",
154 MPI_Abort(MPI_COMM_WORLD, 1);
158 if (kmeans->n_iteration <= 0 || kmeans->dim <= 0 || kmeans->n_means <= 0
159 || kmeans->n_points <= 0 || kmeans->grid_size <= 0) {
160 printf(
"Illegal argument value. All values must be numeric and " 162 MPI_Abort(MPI_COMM_WORLD, 1);
169 const char *tmp_file = KMEANS_CKPT_FILE
".tmp";
170 FILE *fp = fopen(tmp_file,
"w+");
172 fprintf(stderr,
"Failed to open kmeans state file.\n");
176 size_t ret = fwrite(&size,
sizeof(
size_t), 1, fp);
178 fprintf(stderr,
"Failed to write size of kmeans data.\n");
181 ret = fwrite(kmeans, size, 1, fp);
183 fprintf(stderr,
"Failed to write kmeans data.\n");
186 size =
sizeof(int) * (
size_t)kmeans->n_means * (size_t)kmeans->dim;
187 ret = fwrite(&size,
sizeof(
size_t), 1, fp);
189 fprintf(stderr,
"Failed to write size of means array.\n");
192 ret = fwrite(kmeans->means, size, 1, fp);
194 fprintf(stderr,
"Failed to write means array.\n");
201 rename(KMEANS_CKPT_FILE, KMEANS_CKPT_FILE_OLD);
202 rename(tmp_file, KMEANS_CKPT_FILE);
209 FILE *fp = fopen(KMEANS_CKPT_FILE,
"r");
211 fprintf(stderr,
"Failed to open kmeans state file.\n");
215 size_t ret = fread(&size,
sizeof(
size_t), 1, fp);
217 fprintf(stderr,
"Failed to read size of kmeans data.\n");
220 ret = fread(kmeans, size, 1, fp);
222 fprintf(stderr,
"Failed to read kmeans data.\n");
225 kmeans->points = NULL;
226 ret = fread(&size,
sizeof(
size_t), 1, fp);
228 fprintf(stderr,
"Failed to read size of means array.\n");
231 if (size != (
size_t)kmeans->n_means * (
size_t)kmeans->dim *
sizeof(
int)) {
232 fprintf(stderr,
"The size of means array is mismatch.\n");
235 kmeans->means = (
int*)malloc(size);
236 ret = fread(kmeans->means, size, 1, fp);
238 fprintf(stderr,
"Failed to read kmeans data.\n");
245 delete_kmeans_state()
247 remove(KMEANS_CKPT_FILE_OLD);
248 remove(KMEANS_CKPT_FILE);
253 generate_randoms(
int *pts,
int size,
int max_val)
256 for (i = 0; i < size; i++)
257 pts[i] = (_rand() % max_val);
269 int *points = kmeans->points;
271 for (i = 0; i < kmeans->n_points * kmeans->dim; i += kmeans->dim) {
272 struct kmr_kv_box nkv = { .klen = (int)
sizeof(
long),
273 .vlen = kmeans->dim * (int)
sizeof(
int),
275 .v.p = (
void *)&points[i] };
285 calc_sq_dist(
int *v1,
int *v2,
int dim)
288 unsigned int sum = 0;
289 for (i = 0; i < dim; i++)
290 sum += (
unsigned int)((v1[i] - v2[i]) * (v1[i] - v2[i]));
305 int dim = kmeans->dim;
306 int *means = kmeans->means;
307 int n_means = kmeans->n_means;
308 int *point = (
int *)kv.v.p;
310 unsigned int min_dist = calc_sq_dist(point, &means[0], dim);
312 for (i = 1; i < n_means; i++) {
313 unsigned int dist = calc_sq_dist(point, &means[i * dim], dim);
314 if (dist < min_dist) {
319 struct kmr_kv_box nkv = { .klen = (int)
sizeof(
long),
320 .vlen = dim * (int)
sizeof(
int),
322 .v.p = (
void *)point };
333 update_cluster(
const struct kmr_kv_box kv[],
const long n,
337 int cid = (int)kv[0].k.i;
339 int dim = kmeans->dim;
343 for (i = 0; i < dim; i++)
345 for (i = 0; i < n; i++) {
346 int *point = (
int *)kv[i].v.p;
347 for (j = 0; j < dim; j++) {
351 for (i = 0; i < dim; i++)
352 average[i] = (
int)(sum[i] / n);
354 struct kmr_kv_box nkv = { .klen =
sizeof(long),
355 .vlen = dim * (
int)
sizeof(int),
357 .v.p = (
void *)average };
371 int cid = (int)kv.k.i;
372 int *center = (
int *)kv.v.p;
374 int dim = kmeans->dim;
375 int *target = &kmeans->means[cid * dim];
377 for (i = 0; i < dim; i++)
378 target[i] = center[i];
384 print_means(
int *means,
int size,
int dim)
387 for (i = 0; i < size; i++) {
388 int *mean = &means[i * dim];
389 fprintf(stderr,
"( ");
390 for (j = 0; j < dim; j++)
391 fprintf(stderr,
"%d ", mean[j]);
392 fprintf(stderr,
")\n");
397 print_progress(
const char *msg,
int rank,
int n_itr)
399 MPI_Barrier(MPI_COMM_WORLD);
401 fprintf(stderr,
"ITR[%d]: %s\n", n_itr, msg);
403 MPI_Barrier(MPI_COMM_WORLD);
411 if (kmeans->nprocs == 2 &&
412 kmeans->dim == DEF_DIM &&
413 kmeans->grid_size == DEF_GRID_SIZE &&
414 kmeans->n_points == 10 &&
415 kmeans->n_means == 4 &&
416 kmeans->n_iteration >= DEF_NUM_ITERATION) {
419 "Case of 2 nodes, 10 points and 4 centers.\n" 420 "Check the answer...");
421 count = 4 * kmeans->dim;
422 answer = (
int*)malloc((
size_t)count *
sizeof(int));
435 }
else if (kmeans->nprocs == 4 &&
436 kmeans->dim == DEF_DIM &&
437 kmeans->grid_size == DEF_GRID_SIZE &&
438 kmeans->n_points == 10 &&
439 kmeans->n_means == 2 &&
440 kmeans->n_iteration >= DEF_NUM_ITERATION) {
443 "Case of 4 nodes, 10 points and 2 centers.\n" 444 "Check the answer...");
445 count = 2 * kmeans->dim;
446 answer = (
int*)malloc((
size_t)count *
sizeof(int));
453 }
else if (kmeans->nprocs == 4 &&
454 kmeans->dim == DEF_DIM &&
455 kmeans->grid_size == DEF_GRID_SIZE &&
456 kmeans->n_points == 10 &&
457 kmeans->n_means == 4 &&
458 kmeans->n_iteration >= DEF_NUM_ITERATION) {
461 "Case of 4 nodes, 10 points and 4 centers.\n" 462 "Check the answer...");
463 count = 4 * kmeans->dim;
464 answer = (
int*)malloc((
size_t)count *
sizeof(int));
477 }
else if (kmeans->nprocs == 8 &&
478 kmeans->dim == DEF_DIM &&
479 kmeans->grid_size == DEF_GRID_SIZE &&
480 kmeans->n_points == 10 &&
481 kmeans->n_means == 4 &&
482 kmeans->n_iteration >= DEF_NUM_ITERATION) {
485 "Case of 8 nodes, 10 points and 4 centers.\n" 486 "Check the answer...");
487 count = 4 * kmeans->dim;
488 answer = (
int*)malloc((
size_t)count *
sizeof(int));
505 for (
int i = 0; i < count; i++) {
506 if (kmeans->means[i] != answer[i]) {
512 printf(
"The answer is correct.\n");
514 printf(
"The answer is wrong.\n");
523 main(
int argc,
char **argv)
527 int p_iteration = -1;
529 MPI_Init(&argc, &argv);
530 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
531 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
535 MPI_Info_create(&info);
536 MPI_Info_set(info,
"ckpt_enable",
"1");
538 MPI_Info_free(&info);
541 _srand((
unsigned int)(rank + 1));
545 load_kmeans_state(&kmeans);
546 p_iteration = kmeans.c_iteration;
547 kmeans.c_iteration = 0;
548 printf(
"*** Restarted ***\n\n");
550 kmeans.nprocs = nprocs;
551 kmeans.n_iteration = DEF_NUM_ITERATION;
552 kmeans.c_iteration = 0;
553 kmeans.grid_size = DEF_GRID_SIZE;
554 kmeans.dim = DEF_DIM;
555 kmeans.n_points = DEF_NUM_POINTS;
556 kmeans.n_means = DEF_NUM_MEANS;
560 kmeans.means = (
int *) malloc((
size_t)kmeans.n_means * (size_t)kmeans.dim *
sizeof(
int));
561 generate_randoms(kmeans.means, kmeans.n_means * kmeans.dim,
565 printf(
"#### Configuration ###########################\n");
566 printf(
"Number of processes = %d\n", nprocs);
567 printf(
"Iteration = %d\n", kmeans.n_iteration);
568 printf(
"Dimension = %d\n", kmeans.dim);
569 printf(
"Number of clusters = %d\n", kmeans.n_means);
570 printf(
"Number of points = %d\n", kmeans.n_points);
571 printf(
"##############################################\n");
573 MPI_Bcast(&(kmeans.n_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
574 MPI_Bcast(&(kmeans.c_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
575 MPI_Bcast(&(kmeans.grid_size), 1, MPI_INT, 0, MPI_COMM_WORLD);
576 MPI_Bcast(&(kmeans.dim), 1, MPI_INT, 0, MPI_COMM_WORLD);
577 MPI_Bcast(&(kmeans.n_points), 1, MPI_INT, 0, MPI_COMM_WORLD);
578 MPI_Bcast(&(kmeans.n_means), 1, MPI_INT, 0, MPI_COMM_WORLD);
579 MPI_Bcast(&p_iteration, 1, MPI_INT, 0, MPI_COMM_WORLD);
581 kmeans.means = (
int *)malloc((
size_t)kmeans.n_means * (size_t)kmeans.dim *
sizeof(
int));
583 MPI_Bcast(kmeans.means, kmeans.n_means * kmeans.dim, MPI_INT,
585 if (is_restart() != 1) {
587 kmeans.points = (
int *)malloc((
size_t)kmeans.n_points * (size_t)kmeans.dim *
sizeof(
int));
588 generate_randoms(kmeans.points, kmeans.n_points * kmeans.dim,
592 kmr_map_once(kvs_points, (
void *)&kmeans, kmr_noopt, 0, generate_points);
593 print_progress(
"map_onece done", rank, 0);
596 struct timeval tv_s, tv_e;
597 double time_diff, total_time = 0.0;
598 while (kmeans.c_iteration < kmeans.n_iteration) {
600 if (kmeans.c_iteration > p_iteration) {
601 if (measure_time(&tv_s) == -1) {
602 MPI_Abort(MPI_COMM_WORLD, 1);
608 kmr_map(kvs_points, kvs_c2p, (
void *)&kmeans, kmr_inspect,
610 print_progress(
"map done", rank, kmeans.c_iteration);
615 print_progress(
"shuffle done", rank, kmeans.c_iteration);
619 KMR_KV_INTEGER, KMR_KV_OPAQUE);
620 kmr_reduce(kvs_c2p_s, kvs_cluster, (
void *)&kmeans, kmr_noopt,
622 print_progress(
"reduce done", rank, kmeans.c_iteration);
624 if (mr->ckpt_enable && kmeans.c_iteration > (p_iteration + 1)
625 && kmeans.c_iteration == 4) {
627 fprintf(stderr,
"Aborted on rank 0 for testing purpose.\n");
628 MPI_Abort(MPI_COMM_WORLD, 1);
636 print_progress(
"replicate done", rank, kmeans.c_iteration);
639 kmr_map(kvs_all_clusters, NULL, (
void *)&kmeans, kmr_noopt,
643 if (kmeans.c_iteration > p_iteration) {
644 if (measure_time(&tv_e) == -1) {
645 MPI_Abort(MPI_COMM_WORLD, 1);
648 time_diff = calc_time_diff(&tv_s, &tv_e);
649 total_time += time_diff;
651 print_means(kmeans.means, kmeans.n_means, kmeans.dim);
652 if (save_kmeans_state(&kmeans)) {
653 MPI_Abort(MPI_COMM_WORLD, 1);
658 if (mr->ckpt_enable && kmeans.c_iteration > p_iteration
659 && kmeans.c_iteration == 1) {
661 fprintf(stderr,
"Aborted on rank 0 for testing purpose.\n");
662 MPI_Abort(MPI_COMM_WORLD, 1);
666 kmeans.c_iteration += 1;
669 printf(
"Total elapse time: %f\n", total_time);
670 check_answer(&kmeans);
671 delete_kmeans_state();
675 if (is_restart() != 1) {
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
#define kmr_init()
Sets up the environment.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).