KMR
kmeans-kmr.c
1 /* K-Means in KMR (2013-09-19) */
2 
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <sys/time.h>
7 #include <limits.h>
8 #include <math.h>
9 #include <mpi.h>
10 #include "kmr.h"
11 
12 #define DEF_NUM_ITERATION 10
13 #define DEF_NUM_POINTS 10000
14 #define DEF_NUM_MEANS 100
15 #define DEF_DIM 3
16 #define DEF_GRID_SIZE 1000
17 
18 struct kmr_option kmr_inspect = { .inspect = 1 };
19 
20 typedef struct {
21  // number of k-means iteration
22  int n_iteration;
23  // size of grid
24  int grid_size;
25  // dimention of points
26  int dim;
27  // number of points
28  int n_points;
29  // points
30  int *points;
31  // number of means
32  int n_means;
33  // means
34  int *means;
35 } kmeans_t;
36 
37 static int
38 measure_time(struct timeval *tv)
39 {
40  MPI_Barrier(MPI_COMM_WORLD);
41  if (gettimeofday(tv, NULL) == -1) {
42  perror("gettimeofday");
43  return -1;
44  }
45  return 0;
46 }
47 
48 static double
49 calc_time_diff(struct timeval *tv_s, struct timeval *tv_e)
50 {
51  return ((double)tv_e->tv_sec - (double)tv_s->tv_sec)
52  + ((double)tv_e->tv_usec - (double)tv_s->tv_usec) /1000000.0;
53 }
54 
55 /* Parse commandline arguments */
56 static void
57 parse_args(int argc, char **argv, kmeans_t *kmeans)
58 {
59  int c;
60  extern char *optarg;
61 
62  while ((c = getopt(argc, argv, "i:d:c:p:s:")) != EOF) {
63  switch (c) {
64  case 'i':
65  kmeans->n_iteration = atoi(optarg);
66  break;
67  case 'd':
68  kmeans->dim = atoi(optarg);
69  break;
70  case 'c':
71  kmeans->n_means = atoi(optarg);
72  break;
73  case 'p':
74  kmeans->n_points = atoi(optarg);
75  break;
76  case 's':
77  kmeans->grid_size = atoi(optarg);
78  break;
79  case '?':
80  printf("Usage: %s -i <num iteration> -d <vector dimension> "
81  "-c <num clusters> -p <num points> -s <max value>\n", argv[0]);
82  MPI_Abort(MPI_COMM_WORLD, 1);
83  }
84  }
85 
86  if (kmeans->n_iteration <= 0 || kmeans->dim <= 0 || kmeans->n_means <= 0
87  || kmeans->n_points <= 0 || kmeans->grid_size <= 0) {
88  printf("Illegal argument value. All values must be numeric and "
89  "greater than 0\n");
90  MPI_Abort(MPI_COMM_WORLD, 1);
91  }
92 }
93 
94 #if 0
95 /* Show statistics of Key-Value */
96 static void
97 statistics_kv(int iteration, long kv_cnt)
98 {
99  int rank, nprocs;
100  long *data;
101 
102  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
103  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
104  if (rank == 0) {
105  data = (long *)malloc((size_t)nprocs * sizeof(long));
106  } else {
107  data = NULL;
108  }
109  MPI_Gather(&kv_cnt, 1, MPI_LONG, data, 1, MPI_LONG, 0, MPI_COMM_WORLD);
110 
111  if (rank == 0) {
112  int i, min_idx, max_idx;
113  long sum = 0, min = LONG_MAX, max = 0;
114  for (i = 0; i < nprocs; i++) {
115  sum += data[i];
116  if (data[i] < min) {
117  min = data[i];
118  min_idx = i;
119  }
120  if (data[i] > max) {
121  max = data[i];
122  max_idx = i;
123  }
124  }
125  double average = (double)sum / nprocs;
126  double variance = 0.0;
127  for (i = 0; i < nprocs; i++) {
128  variance += ((double)data[i] - average) * ((double)data[i] - average);
129  }
130  variance /= nprocs;
131  double std_dev = sqrt(variance);
132 
133 #if 0
134  printf("Itr[%2d]: KV statistics\n"
135  "\tMin # of KVs is %10ld on Rank[%05d]\n"
136  "\tMax # of KVs is %10ld on Rank[%05d]\n"
137  "\tStd Dev of KVs is %f\n",
138  iteration, min, min_idx, max, max_idx, std_dev);
139 #endif
140  printf("%d,%ld,%ld,%f\n", iteration, min, max, std_dev);
141 
142  free(data);
143  }
144 }
145 #endif
146 
147 /* Generate the points. */
148 static void
149 generate_randoms(int *pts, int size, int max_val)
150 {
151  int i;
152  for (i = 0; i < size; i++)
153  pts[i] = (rand() % max_val);
154 }
155 
156 /* KMR map function
157  It copies multi-dimension points to KVS.
158 */
159 static int
160 generate_points(const struct kmr_kv_box kv,
161  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
162 {
163  int i;
164  kmeans_t *kmeans = (kmeans_t *)p;
165  int *points = kmeans->points;
166 
167  for (i = 0; i < kmeans->n_points * kmeans->dim; i += kmeans->dim) {
168  struct kmr_kv_box nkv = { .klen = (int)sizeof(long),
169  .vlen = kmeans->dim * (int)sizeof(int),
170  .k.i = i,
171  .v.p = (void *)&points[i] };
172  kmr_add_kv(kvo, nkv);
173  }
174 
175  return MPI_SUCCESS;
176 }
177 
178 /* calculate squared distance
179  */
180 static unsigned int
181 calc_sq_dist(int *v1, int *v2, int dim)
182 {
183  int i;
184  unsigned int sum = 0;
185  for (i = 0; i < dim; i++)
186  sum += (unsigned int)((v1[i] - v2[i]) * (v1[i] - v2[i]));
187  return sum;
188 }
189 
190 /* KMR map function
191  It calculates cluster of a point stored in kv.
192  It emits a Key-Value whose key is cluster id (index of array
193  "kmeans.means") and value is the point.
194 */
195 static int
196 calc_cluster(const struct kmr_kv_box kv,
197  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
198 {
199  int i;
200  kmeans_t *kmeans = (kmeans_t *)p;
201  int dim = kmeans->dim;
202  int *means = kmeans->means;
203  int n_means = kmeans->n_means;
204  int *point = (int *)kv.v.p;
205  int min_idx = 0;
206  unsigned int min_dist = calc_sq_dist(point, &means[0], dim);
207 
208  for (i = 1; i < n_means; i++) {
209  unsigned int dist = calc_sq_dist(point, &means[i * dim], dim);
210  if (dist < min_dist) {
211  min_idx = i;
212  min_dist = dist;
213  }
214  }
215  struct kmr_kv_box nkv = { .klen = (int)sizeof(long),
216  .vlen = dim * (int)sizeof(int),
217  .k.i = min_idx,
218  .v.p = (void *)point };
219  kmr_add_kv(kvo, nkv);
220 
221  return MPI_SUCCESS;
222 }
223 
224 /* KMR reduce function
225  It calculates center of clusters.
226  It emits a Key-Value whose key is cluster id and value is the new center.
227 */
228 static int
229 update_cluster(const struct kmr_kv_box kv[], const long n,
230  const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
231 {
232  int i, j;
233  int cid = (int)kv[0].k.i;
234  kmeans_t *kmeans = (kmeans_t *)p;
235  int dim = kmeans->dim;
236  long sum[dim];
237  int average[dim];
238 
239  for (i = 0; i < dim; i++)
240  sum[i] = 0;
241  for (i = 0; i < n; i++) {
242  int *point = (int *)kv[i].v.p;
243  for (j = 0; j < dim; j++) {
244  sum[j] += point[j];
245  }
246  }
247  for (i = 0; i < dim; i++)
248  average[i] = (int)(sum[i] / n);
249 
250  struct kmr_kv_box nkv = { .klen = (int)sizeof(long),
251  .vlen = dim * (int)sizeof(int),
252  .k.i = cid,
253  .v.p = (void *)average };
254  kmr_add_kv(kvo, nkv);
255 
256  return MPI_SUCCESS;
257 }
258 
259 /* KMR map function
260  It copies centers of clusters stored in kv to "kmeans.means" array.
261 */
262 static int
263 copy_center(const struct kmr_kv_box kv,
264  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
265 {
266  int i;
267  int cid = (int)kv.k.i;
268  int *center = (int *)kv.v.p;
269  kmeans_t *kmeans = (kmeans_t *)p;
270  int dim = kmeans->dim;
271  int *target = &kmeans->means[cid * dim];
272 
273  for (i = 0; i < dim; i++)
274  target[i] = center[i];
275 
276  return MPI_SUCCESS;
277 }
278 
279 #if defined(FULL_DEBUG)
280 /* KMR map function
281  It shows points.
282 */
283 static int
284 show_points(const struct kmr_kv_box kv,
285  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
286 {
287  int i;
288  char buf[100], buf2[10];
289  kmeans_t *kmeans = (kmeans_t *)p;
290  int *point = (int *)kv.v.p;
291 
292  memset(buf, 0, strlen(buf));
293  for (i = 0; i < kmeans->dim; i++) {
294  sprintf(buf2, "%d ", point[i]);
295  strncat(buf, buf2, strlen(buf2));
296  }
297  fprintf(stderr, "( %s)\n", buf);
298 
299  return MPI_SUCCESS;
300 }
301 
302 /* KMR map function
303  It shows points with their cluster.
304 */
305 static int
306 show_points_w_clusters(const struct kmr_kv_box kv,
307  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
308 {
309  int i;
310  char buf[100], buf2[10];
311  kmeans_t *kmeans = (kmeans_t *)p;
312  int cid = (int)kv.k.i;
313  int *point = (int *)kv.v.p;
314  int rank;
315 
316  memset(buf, 0, strlen(buf));
317  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
318  for (i = 0; i < kmeans->dim; i++) {
319  sprintf(buf2, "%d ", point[i]);
320  strncat(buf, buf2, strlen(buf2));
321  }
322  fprintf(stderr, "RANK[%d]: %3d ( %s)\n", rank, cid, buf);
323 
324  return MPI_SUCCESS;
325 }
326 #endif
327 
328 #if defined(DEBUG) || defined(FULL_DEBUG)
329 static void
330 print_means(int *means, int size, int dim)
331 {
332  int i, j;
333  for (i = 0; i < size; i++) {
334  int *mean = &means[i * dim];
335  printf("( ");
336  for (j = 0; j < dim; j++)
337  printf("%d ", mean[j]);
338  printf(")\n");
339  }
340 }
341 #endif
342 
343 ////////////////////////////////////////////////////////////////////////////////
344 
345 int
346 main(int argc, char **argv)
347 {
348  kmeans_t kmeans;
349  int nprocs, rank;
350 
351  MPI_Init(&argc, &argv);
352  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
353  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
354 
355  kmr_init();
356  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
357 
358  // Initialize using MPI functions
359  srand((unsigned int)((rank + 1) * getpid()));
360  if (rank == 0) {
361  kmeans.n_iteration = DEF_NUM_ITERATION;
362  kmeans.grid_size = DEF_GRID_SIZE;
363  kmeans.dim = DEF_DIM;
364  // TODO decide the number of points assigned to each procs randomly
365  kmeans.n_points = DEF_NUM_POINTS;
366  kmeans.n_means = DEF_NUM_MEANS;
367 
368  parse_args(argc, argv, &kmeans);
369 
370  printf("#### Configuration ###########################\n");
371  printf("Number of processes = %d\n", nprocs);
372  printf("Iteration = %d\n", kmeans.n_iteration);
373  printf("Dimension = %d\n", kmeans.dim);
374  printf("Number of clusters = %d\n", kmeans.n_means);
375  printf("Number of points = %d\n", kmeans.n_points);
376  printf("##############################################\n");
377  }
378  MPI_Bcast(&(kmeans.n_iteration), 1, MPI_INT, 0, MPI_COMM_WORLD);
379  MPI_Bcast(&(kmeans.grid_size), 1, MPI_INT, 0, MPI_COMM_WORLD);
380  MPI_Bcast(&(kmeans.dim), 1, MPI_INT, 0, MPI_COMM_WORLD);
381  MPI_Bcast(&(kmeans.n_points), 1, MPI_INT, 0, MPI_COMM_WORLD);
382  MPI_Bcast(&(kmeans.n_means), 1, MPI_INT, 0, MPI_COMM_WORLD);
383  // set initial centers randomly on rank 0
384  kmeans.means = (int *)malloc((size_t)kmeans.n_means * (size_t)kmeans.dim * sizeof(int));
385  if (rank == 0) {
386  generate_randoms(kmeans.means, kmeans.n_means * kmeans.dim,
387  kmeans.grid_size);
388  }
389  MPI_Bcast(kmeans.means, kmeans.n_means * kmeans.dim, MPI_INT,
390  0, MPI_COMM_WORLD);
391  // set points randomly on each rank
392  kmeans.points = (int *)malloc((size_t)kmeans.n_points * (size_t)kmeans.dim * sizeof(int));
393  generate_randoms(kmeans.points, kmeans.n_points * kmeans.dim,
394  kmeans.grid_size);
395 
396  struct timeval tv_ts, tv_te, tv_s, tv_e;
397  // measure kernel start time
398  if (measure_time(&tv_ts) == -1) {
399  MPI_Abort(MPI_COMM_WORLD, 1);
400  }
401 
402  // kernel //
403  int itr = 0;
404  while (itr < kmeans.n_iteration) {
405  KMR_KVS *kvs_points = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
406  kmr_map_once(kvs_points, (void *)&kmeans, kmr_noopt, 0, generate_points);
407  long kv_cnt;
408  kmr_local_element_count(kvs_points, &kv_cnt);
409  //statistics_kv(itr + 1, kv_cnt);
410 
411  // measure iteration start time
412  if (measure_time(&tv_s) == -1) {
413  MPI_Abort(MPI_COMM_WORLD, 1);
414  }
415 
416 #ifdef DEBUG
417  if (rank == 0) {
418  printf("Iteration[%2d]: Means\n", itr);
419  print_means(kmeans.means, kmeans.n_means, kmeans.dim);
420  }
421 #endif
422 #ifdef FULL_DEBUG
423  fprintf(stderr, "Iteration[%2d]: Points\n", itr);
424  kmr_map(kvs_points, NULL, (void *)&kmeans, kmr_inspect, show_points);
425 #endif
426 
427  // call kmr_map to calculate cluster
428  KMR_KVS *kvs_c2p = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
429  kmr_map(kvs_points, kvs_c2p, (void *)&kmeans, kmr_noopt, calc_cluster);
430 #ifdef FULL_DEBUG
431  fprintf(stderr, "Iteration[%2d]: Map result\n", itr);
432  kmr_map(kvs_c2p, NULL, (void *)&kmeans, kmr_inspect,
433  show_points_w_clusters);
434 #endif
435 
436  // call kmr_shuffle to gather points which belong to the same cluster
437  KMR_KVS *kvs_c2p_s = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
438  kmr_shuffle(kvs_c2p, kvs_c2p_s, kmr_noopt);
439 #ifdef FULL_DEBUG
440  fprintf(stderr, "Iteration[%2d]: Shuffle result\n", itr);
441  kmr_map(kvs_c2p_s, NULL, (void *)&kmeans, kmr_inspect,
442  show_points_w_clusters);
443 #endif
444 
445  // call kmr_reduce to update cluster center
446  KMR_KVS *kvs_cluster = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
447  kmr_reduce(kvs_c2p_s, kvs_cluster, (void *)&kmeans, kmr_noopt,
448  update_cluster);
449 #ifdef FULL_DEBUG
450  fprintf(stderr, "Iteration[%2d]: Reduce result\n", itr);
451  kmr_map(kvs_cluster, NULL, (void *)&kmeans, kmr_inspect,
452  show_points_w_clusters);
453 #endif
454 
455  // cal kmr_replicate to share new cluster centers
456  KMR_KVS *kvs_all_clusters =
457  kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
458  kmr_replicate(kvs_cluster, kvs_all_clusters, kmr_noopt);
459 #ifdef FULL_DEBUG
460  fprintf(stderr, "Iteration[%2d]: Replicate result\n", itr);
461  kmr_map(kvs_all_clusters, NULL, (void *)&kmeans, kmr_inspect,
462  show_points_w_clusters);
463 #endif
464 
465  // call kmr_map to copy new cluster centers to kmeans.means array.
466  kmr_map(kvs_all_clusters, NULL, (void *)&kmeans, kmr_noopt, copy_center);
467  itr += 1;
468 
469  // measure iteration end time
470  if (measure_time(&tv_e) == -1) {
471  MPI_Abort(MPI_COMM_WORLD, 1);
472  }
473  if (rank == 0) {
474  double time_diff = calc_time_diff(&tv_s, &tv_e);
475  printf("Iteration[%2d]: Elapse time: %f\n", itr, time_diff);
476  }
477  }
478 
479  // measure kernel end time
480  if (measure_time(&tv_te) == -1) {
481  MPI_Abort(MPI_COMM_WORLD, 1);
482  }
483 
484  if (rank == 0) {
485  double total_time = calc_time_diff(&tv_ts, &tv_te);
486  printf("Total elapse time: %f\n", total_time);
487  printf("Cluster corrdinates\n");
488  for (int i = 0; i < kmeans.n_means * kmeans.dim; i += kmeans.dim) {
489  printf(" ( ");
490  for (int j = 0; j < kmeans.dim; j++) {
491  printf("%d, ", kmeans.means[i + j]);
492  }
493  printf(")\n");
494  }
495  }
496 
497  kmr_free_context(mr);
498  kmr_fin();
499 
500  MPI_Finalize();
501  return 0;
502 }
Key-Value Stream (abstract).
Definition: kmr.h:587
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1402
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
KMR Context.
Definition: kmr.h:222
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
Definition: kmrshell.c:257
int kmr_local_element_count(KMR_KVS *kvs, long *v)
Gets the number of key-value pairs locally on each rank.
Definition: kmrutil.c:349
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147