10 #include "mapreduce.h" 13 #define DEF_NUM_ITERATION 10 14 #define DEF_NUM_POINTS 10000 15 #define DEF_NUM_MEANS 100 17 #define DEF_GRID_SIZE 1000 39 measure_time(
struct timeval *tv)
41 MPI_Barrier(MPI_COMM_WORLD);
42 if (gettimeofday(tv, NULL) == -1) {
43 perror(
"gettimeofday");
50 calc_time_diff(
struct timeval *tv_s,
struct timeval *tv_e)
52 return ((
double)tv_e->tv_sec - (
double)tv_s->tv_sec)
53 + ((double)tv_e->tv_usec - (
double)tv_s->tv_usec) /1000000.0;
63 while ((c = getopt(argc, argv,
"i:d:c:p:s:")) != EOF) {
66 kmeans->n_iteration = atoi(optarg);
69 kmeans->dim = atoi(optarg);
72 kmeans->n_means = atoi(optarg);
75 kmeans->n_points = atoi(optarg);
78 kmeans->grid_size = atoi(optarg);
81 printf(
"Usage: %s -i <num iteration> -d <vector dimension> " 82 "-c <num clusters> -p <num points> -s <max value>\n", argv[0]);
83 MPI_Abort(MPI_COMM_WORLD, 1);
87 if (kmeans->n_iteration <= 0 || kmeans->dim <= 0 || kmeans->n_means <= 0
88 || kmeans->n_points <= 0 || kmeans->grid_size <= 0) {
89 printf(
"Illegal argument value. All values must be numeric and " 91 MPI_Abort(MPI_COMM_WORLD, 1);
97 statistics_kv(
int iteration,
long kv_cnt)
102 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
103 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
105 data = (
long *)malloc(nprocs *
sizeof(
long));
109 MPI_Gather(&kv_cnt, 1, MPI_LONG, data, 1, MPI_LONG, 0, MPI_COMM_WORLD);
112 int i, min_idx, max_idx;
113 long sum = 0, min = LONG_MAX, max = 0;
114 for (i = 0; i < nprocs; i++) {
125 double average = (double)sum / nprocs;
126 double variance = 0.0;
127 for (i = 0; i < nprocs; i++) {
128 variance += ((double)data[i] - average) * ((double)data[i] - average);
131 double std_dev = sqrt(variance);
134 printf(
"Itr[%2d]: KV statistics\n" 135 "\tMin # of KVs is %10ld on Rank[%05d]\n" 136 "\tMax # of KVs is %10ld on Rank[%05d]\n" 137 "\tStd Dev of KVs is %f\n",
138 iteration, min, min_idx, max, max_idx, std_dev);
140 printf(
"%d,%ld,%ld,%f\n", iteration, min, max, std_dev);
148 generate_randoms(
int *pts,
int size,
int max_val)
151 for (i = 0; i < size; i++)
152 pts[i] = (rand() % max_val);
159 generate_points(
int itask, KeyValue *kv,
void *ptr)
163 int *points = kmeans->points;
165 for (i = 0; i < kmeans->n_points * kmeans->dim; i += kmeans->dim) {
166 kv->add((
char *)&i,
sizeof(
int),
167 (
char *)&points[i], kmeans->dim * (
int)
sizeof(
int));
174 calc_sq_dist(
int *v1,
int *v2,
int dim)
177 unsigned int sum = 0;
178 for (i = 0; i < dim; i++)
179 sum += ((v1[i] - v2[i]) * (v1[i] - v2[i]));
189 calc_cluster(uint64_t itask,
char *key,
int keybytes,
190 char *value,
int valuebytes, KeyValue *kv,
void *ptr)
194 int dim = kmeans->dim;
195 int *means = kmeans->means;
196 int n_means = kmeans->n_means;
197 int *point = (
int *)value;
199 unsigned int min_dist = calc_sq_dist(point, &means[0], dim);
201 for (i = 1; i < n_means; i++) {
202 unsigned int dist = calc_sq_dist(point, &means[i * dim], dim);
203 if (dist < min_dist) {
208 kv->add((
char *)&min_idx,
sizeof(
int),
209 (
char *)point, dim * (
int)
sizeof(
int));
217 update_cluster(
char *key,
int keybytes,
char *multivalue,
int nvalues,
218 int *valuebytes, KeyValue *kv,
void *ptr)
223 int *points = (
int *)multivalue;
224 int dim = kmeans->dim;
227 for (i = 0; i < dim; i++)
229 for (i = 0; i < nvalues; i++) {
230 int *point = &points[i * dim];
231 for (j = 0; j < dim; j++) {
232 average[j] += point[j];
235 for (i = 0; i < dim; i++)
236 average[i] /= nvalues;
238 kv->add((
char *)&cid,
sizeof(
int),
239 (
char *)average, dim * (
int)
sizeof(
int));
246 copy_center(uint64_t itask,
char *key,
int keybytes,
247 char *value,
int valuebytes, KeyValue *kv,
void *ptr)
251 int *center = (
int *)value;
253 int dim = kmeans->dim;
254 int *target = &kmeans->means[cid * dim];
256 for (i = 0; i < dim; i++)
257 target[i] = center[i];
260 #if defined(DEBUG) || defined(FULL_DEBUG) 265 show_points(
char *key,
int keybytes,
char *value,
int valuebytes,
void *ptr)
268 char buf[100], buf2[10];
270 int *point = (
int *)value;
272 memset(buf, 0, strlen(buf));
273 for (i = 0; i < kmeans->dim; i++) {
274 sprintf(buf2,
"%d ", point[i]);
275 strncat(buf, buf2, strlen(buf2));
277 fprintf(stderr,
"( %s)\n", buf);
284 show_points_w_clusters(
char *key,
int keybytes,
285 char *value,
int valuebytes,
void *ptr)
288 char buf[100], buf2[10];
291 int *point = (
int *)value;
294 memset(buf, 0, strlen(buf));
295 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
296 for (i = 0; i < kmeans->dim; i++) {
297 sprintf(buf2,
"%d ", point[i]);
298 strncat(buf, buf2, strlen(buf2));
300 fprintf(stderr,
"RANK[%d]: %3d ( %s)\n", rank, cid, buf);
307 show_points_w_clusters_m(
char *key,
int keybytes,
308 char *multivalue,
int nvalues,
int *valuebytes,
312 char buf[100], buf2[10];
315 int *points = (
int *)multivalue;
318 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
319 for (j = 0; j < nvalues; j++) {
320 memset(buf, 0, strlen(buf));
321 int *point = &points[j * kmeans->dim];
322 for (i = 0; i < kmeans->dim; i++) {
323 sprintf(buf2,
"%d ", point[i]);
324 strncat(buf, buf2, strlen(buf2));
326 fprintf(stderr,
"RANK[%d]: %3d ( %s)\n", rank, cid, buf);
331 print_means(
int *means,
int size,
int dim)
334 for (i = 0; i < size; i++) {
335 int *mean = &means[i * dim];
337 for (j = 0; j < dim; j++)
338 printf(
"%d ", mean[j]);
347 main(
int argc,
char **argv)
352 MPI_Init(&argc, &argv);
353 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
354 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
357 srand((rank + 1) * getpid());
359 kmeans.n_iteration = DEF_NUM_ITERATION;
360 kmeans.grid_size = DEF_GRID_SIZE;
361 kmeans.dim = DEF_DIM;
363 kmeans.n_points = DEF_NUM_POINTS;
364 kmeans.n_means = DEF_NUM_MEANS;
368 printf(
"#### Configuration ###########################\n");
369 printf(
"Number of processes = %d\n", nprocs);
370 printf(
"Iteration = %d\n", kmeans.n_iteration);
371 printf(
"Dimension = %d\n", kmeans.dim);
372 printf(
"Number of clusters = %d\n", kmeans.n_means);
373 printf(
"Number of points = %d\n", kmeans.n_points);
374 printf(
"##############################################\n");
376 MPI_Bcast(&(kmeans.n_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
377 MPI_Bcast(&(kmeans.grid_size), 1, MPI_INT, 0, MPI_COMM_WORLD);
378 MPI_Bcast(&(kmeans.dim), 1, MPI_INT, 0, MPI_COMM_WORLD);
379 MPI_Bcast(&(kmeans.n_points), 1, MPI_INT, 0, MPI_COMM_WORLD);
380 MPI_Bcast(&(kmeans.n_means), 1, MPI_INT, 0, MPI_COMM_WORLD);
382 kmeans.means = (
int *)malloc(kmeans.n_means * kmeans.dim *
sizeof(
int));
384 generate_randoms(kmeans.means, kmeans.n_means * kmeans.dim,
387 MPI_Bcast(kmeans.means, kmeans.n_means * kmeans.dim, MPI_INT,
390 kmeans.points = (
int *)malloc(kmeans.n_points * kmeans.dim *
sizeof(
int));
391 generate_randoms(kmeans.points, kmeans.n_points * kmeans.dim,
394 struct timeval tv_ts, tv_te, tv_s, tv_e;
396 if (measure_time(&tv_ts) == -1) {
397 MPI_Abort(MPI_COMM_WORLD, 1);
402 while (itr < kmeans.n_iteration) {
403 MapReduce *mr =
new MapReduce(MPI_COMM_WORLD);
408 mr->map(nprocs, generate_points, (
void *)&kmeans);
409 statistics_kv(itr + 1, mr->kv->nkv);
412 if (measure_time(&tv_s) == -1) {
413 MPI_Abort(MPI_COMM_WORLD, 1);
418 printf(
"Iteration[%2d]: Means\n", itr);
419 print_means(kmeans.means, kmeans.n_means, kmeans.dim);
423 fprintf(stderr,
"Iteration[%2d]: Points\n", itr);
424 mr->scan(show_points, (
void *)&kmeans);
428 mr->map(mr, calc_cluster, (
void *)&kmeans);
430 fprintf(stderr,
"Iteration[%2d]: Map result\n", itr);
431 mr->scan(show_points_w_clusters, (
void *)&kmeans);
438 fprintf(stderr,
"Iteration[%2d]: Collate(aggregate) result\n", itr);
439 mr->scan(show_points_w_clusters, (
void *)&kmeans);
446 fprintf(stderr,
"Iteration[%2d]: Collate(convert) result\n", itr);
447 mr->scan(show_points_w_clusters_m, (
void *)&kmeans);
451 mr->reduce(update_cluster, (
void *)&kmeans);
453 fprintf(stderr,
"Iteration[%2d]: Reduce result\n", itr);
454 mr->scan(show_points_w_clusters, (
void *)&kmeans);
461 fprintf(stderr,
"Iteration[%2d]: Gather & Broadcast result\n", itr);
462 mr->scan(show_points_w_clusters, (
void *)&kmeans);
466 mr->map(mr, copy_center, (
void *)&kmeans);
471 if (measure_time(&tv_e) == -1) {
472 MPI_Abort(MPI_COMM_WORLD, 1);
475 double time_diff = calc_time_diff(&tv_s, &tv_e);
476 printf(
"Iteration[%2d]: Elapse time: %f\n", itr, time_diff);
481 if (measure_time(&tv_te) == -1) {
482 MPI_Abort(MPI_COMM_WORLD, 1);
486 double total_time = calc_time_diff(&tv_ts, &tv_te);
487 printf(
"Total elapse time: %f\n", total_time);
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.