12 #define DEF_NUM_ITERATION 10 13 #define DEF_NUM_POINTS 10000 14 #define DEF_NUM_MEANS 100 16 #define DEF_GRID_SIZE 1000 18 struct kmr_option kmr_inspect = { .inspect = 1 };
38 measure_time(
struct timeval *tv)
40 MPI_Barrier(MPI_COMM_WORLD);
41 if (gettimeofday(tv, NULL) == -1) {
42 perror(
"gettimeofday");
49 calc_time_diff(
struct timeval *tv_s,
struct timeval *tv_e)
51 return ((
double)tv_e->tv_sec - (
double)tv_s->tv_sec)
52 + ((double)tv_e->tv_usec - (
double)tv_s->tv_usec) /1000000.0;
62 while ((c = getopt(argc, argv,
"i:d:c:p:s:")) != EOF) {
65 kmeans->n_iteration = atoi(optarg);
68 kmeans->dim = atoi(optarg);
71 kmeans->n_means = atoi(optarg);
74 kmeans->n_points = atoi(optarg);
77 kmeans->grid_size = atoi(optarg);
80 printf(
"Usage: %s -i <num iteration> -d <vector dimension> " 81 "-c <num clusters> -p <num points> -s <max value>\n", argv[0]);
82 MPI_Abort(MPI_COMM_WORLD, 1);
86 if (kmeans->n_iteration <= 0 || kmeans->dim <= 0 || kmeans->n_means <= 0
87 || kmeans->n_points <= 0 || kmeans->grid_size <= 0) {
88 printf(
"Illegal argument value. All values must be numeric and " 90 MPI_Abort(MPI_COMM_WORLD, 1);
97 statistics_kv(
int iteration,
long kv_cnt)
102 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
103 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
105 data = (
long *)malloc((
size_t)nprocs *
sizeof(long));
109 MPI_Gather(&kv_cnt, 1, MPI_LONG, data, 1, MPI_LONG, 0, MPI_COMM_WORLD);
112 int i, min_idx, max_idx;
113 long sum = 0, min = LONG_MAX, max = 0;
114 for (i = 0; i < nprocs; i++) {
125 double average = (double)sum / nprocs;
126 double variance = 0.0;
127 for (i = 0; i < nprocs; i++) {
128 variance += ((double)data[i] - average) * ((double)data[i] - average);
131 double std_dev = sqrt(variance);
134 printf(
"Itr[%2d]: KV statistics\n" 135 "\tMin # of KVs is %10ld on Rank[%05d]\n" 136 "\tMax # of KVs is %10ld on Rank[%05d]\n" 137 "\tStd Dev of KVs is %f\n",
138 iteration, min, min_idx, max, max_idx, std_dev);
140 printf(
"%d,%ld,%ld,%f\n", iteration, min, max, std_dev);
149 generate_randoms(
int *pts,
int size,
int max_val)
152 for (i = 0; i < size; i++)
153 pts[i] = (rand() % max_val);
165 int *points = kmeans->points;
167 for (i = 0; i < kmeans->n_points * kmeans->dim; i += kmeans->dim) {
168 struct kmr_kv_box nkv = { .klen = (int)
sizeof(
long),
169 .vlen = kmeans->dim * (int)
sizeof(
int),
171 .v.p = (
void *)&points[i] };
181 calc_sq_dist(
int *v1,
int *v2,
int dim)
184 unsigned int sum = 0;
185 for (i = 0; i < dim; i++)
186 sum += (
unsigned int)((v1[i] - v2[i]) * (v1[i] - v2[i]));
201 int dim = kmeans->dim;
202 int *means = kmeans->means;
203 int n_means = kmeans->n_means;
204 int *point = (
int *)kv.v.p;
206 unsigned int min_dist = calc_sq_dist(point, &means[0], dim);
208 for (i = 1; i < n_means; i++) {
209 unsigned int dist = calc_sq_dist(point, &means[i * dim], dim);
210 if (dist < min_dist) {
215 struct kmr_kv_box nkv = { .klen = (int)
sizeof(
long),
216 .vlen = dim * (int)
sizeof(
int),
218 .v.p = (
void *)point };
229 update_cluster(
const struct kmr_kv_box kv[],
const long n,
233 int cid = (int)kv[0].k.i;
235 int dim = kmeans->dim;
239 for (i = 0; i < dim; i++)
241 for (i = 0; i < n; i++) {
242 int *point = (
int *)kv[i].v.p;
243 for (j = 0; j < dim; j++) {
247 for (i = 0; i < dim; i++)
248 average[i] = (
int)(sum[i] / n);
250 struct kmr_kv_box nkv = { .klen = (int)
sizeof(
long),
251 .vlen = dim * (int)
sizeof(
int),
253 .v.p = (
void *)average };
267 int cid = (int)kv.k.i;
268 int *center = (
int *)kv.v.p;
270 int dim = kmeans->dim;
271 int *target = &kmeans->means[cid * dim];
273 for (i = 0; i < dim; i++)
274 target[i] = center[i];
279 #if defined(FULL_DEBUG) 288 char buf[100], buf2[10];
290 int *point = (
int *)kv.v.p;
292 memset(buf, 0, strlen(buf));
293 for (i = 0; i < kmeans->dim; i++) {
294 sprintf(buf2,
"%d ", point[i]);
295 strncat(buf, buf2, strlen(buf2));
297 fprintf(stderr,
"( %s)\n", buf);
306 show_points_w_clusters(
const struct kmr_kv_box kv,
310 char buf[100], buf2[10];
312 int cid = (int)kv.k.i;
313 int *point = (
int *)kv.v.p;
316 memset(buf, 0, strlen(buf));
317 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
318 for (i = 0; i < kmeans->dim; i++) {
319 sprintf(buf2,
"%d ", point[i]);
320 strncat(buf, buf2, strlen(buf2));
322 fprintf(stderr,
"RANK[%d]: %3d ( %s)\n", rank, cid, buf);
328 #if defined(DEBUG) || defined(FULL_DEBUG) 330 print_means(
int *means,
int size,
int dim)
333 for (i = 0; i < size; i++) {
334 int *mean = &means[i * dim];
336 for (j = 0; j < dim; j++)
337 printf(
"%d ", mean[j]);
346 main(
int argc,
char **argv)
351 MPI_Init(&argc, &argv);
352 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
353 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
359 srand((
unsigned int)((rank + 1) * getpid()));
361 kmeans.n_iteration = DEF_NUM_ITERATION;
362 kmeans.grid_size = DEF_GRID_SIZE;
363 kmeans.dim = DEF_DIM;
365 kmeans.n_points = DEF_NUM_POINTS;
366 kmeans.n_means = DEF_NUM_MEANS;
370 printf(
"#### Configuration ###########################\n");
371 printf(
"Number of processes = %d\n", nprocs);
372 printf(
"Iteration = %d\n", kmeans.n_iteration);
373 printf(
"Dimension = %d\n", kmeans.dim);
374 printf(
"Number of clusters = %d\n", kmeans.n_means);
375 printf(
"Number of points = %d\n", kmeans.n_points);
376 printf(
"##############################################\n");
378 MPI_Bcast(&(kmeans.n_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
379 MPI_Bcast(&(kmeans.grid_size), 1, MPI_INT, 0, MPI_COMM_WORLD);
380 MPI_Bcast(&(kmeans.dim), 1, MPI_INT, 0, MPI_COMM_WORLD);
381 MPI_Bcast(&(kmeans.n_points), 1, MPI_INT, 0, MPI_COMM_WORLD);
382 MPI_Bcast(&(kmeans.n_means), 1, MPI_INT, 0, MPI_COMM_WORLD);
384 kmeans.means = (
int *)malloc((
size_t)kmeans.n_means * (size_t)kmeans.dim *
sizeof(
int));
386 generate_randoms(kmeans.means, kmeans.n_means * kmeans.dim,
389 MPI_Bcast(kmeans.means, kmeans.n_means * kmeans.dim, MPI_INT,
392 kmeans.points = (
int *)malloc((
size_t)kmeans.n_points * (size_t)kmeans.dim *
sizeof(
int));
393 generate_randoms(kmeans.points, kmeans.n_points * kmeans.dim,
396 struct timeval tv_ts, tv_te, tv_s, tv_e;
398 if (measure_time(&tv_ts) == -1) {
399 MPI_Abort(MPI_COMM_WORLD, 1);
404 while (itr < kmeans.n_iteration) {
406 kmr_map_once(kvs_points, (
void *)&kmeans, kmr_noopt, 0, generate_points);
412 if (measure_time(&tv_s) == -1) {
413 MPI_Abort(MPI_COMM_WORLD, 1);
418 printf(
"Iteration[%2d]: Means\n", itr);
419 print_means(kmeans.means, kmeans.n_means, kmeans.dim);
423 fprintf(stderr,
"Iteration[%2d]: Points\n", itr);
424 kmr_map(kvs_points, NULL, (
void *)&kmeans, kmr_inspect, show_points);
429 kmr_map(kvs_points, kvs_c2p, (
void *)&kmeans, kmr_noopt, calc_cluster);
431 fprintf(stderr,
"Iteration[%2d]: Map result\n", itr);
432 kmr_map(kvs_c2p, NULL, (
void *)&kmeans, kmr_inspect,
433 show_points_w_clusters);
440 fprintf(stderr,
"Iteration[%2d]: Shuffle result\n", itr);
441 kmr_map(kvs_c2p_s, NULL, (
void *)&kmeans, kmr_inspect,
442 show_points_w_clusters);
447 kmr_reduce(kvs_c2p_s, kvs_cluster, (
void *)&kmeans, kmr_noopt,
450 fprintf(stderr,
"Iteration[%2d]: Reduce result\n", itr);
451 kmr_map(kvs_cluster, NULL, (
void *)&kmeans, kmr_inspect,
452 show_points_w_clusters);
460 fprintf(stderr,
"Iteration[%2d]: Replicate result\n", itr);
461 kmr_map(kvs_all_clusters, NULL, (
void *)&kmeans, kmr_inspect,
462 show_points_w_clusters);
466 kmr_map(kvs_all_clusters, NULL, (
void *)&kmeans, kmr_noopt, copy_center);
470 if (measure_time(&tv_e) == -1) {
471 MPI_Abort(MPI_COMM_WORLD, 1);
474 double time_diff = calc_time_diff(&tv_s, &tv_e);
475 printf(
"Iteration[%2d]: Elapse time: %f\n", itr, time_diff);
480 if (measure_time(&tv_te) == -1) {
481 MPI_Abort(MPI_COMM_WORLD, 1);
485 double total_time = calc_time_diff(&tv_ts, &tv_te);
486 printf(
"Total elapse time: %f\n", total_time);
487 printf(
"Cluster corrdinates\n");
488 for (
int i = 0; i < kmeans.n_means * kmeans.dim; i += kmeans.dim) {
490 for (
int j = 0; j < kmeans.dim; j++) {
491 printf(
"%d, ", kmeans.means[i + j]);
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
#define kmr_init()
Sets up the environment.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
int kmr_local_element_count(KMR_KVS *kvs, long *v)
Gets the number of key-value pairs locally on each rank.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).