44 #include "stddefines.h" 45 #include "map_reduce.h" 50 #define DEF_NUM_POINTS 100000 51 #define DEF_NUM_MEANS 100 53 #define DEF_GRID_SIZE 1000 89 #define dprintf(...) fprintf(stdout, __VA_ARGS__) 92 get_time(
struct timeval *tv)
100 dump_means(
int *_,
int size)
102 for (
int i = 0; i < size; i++) {
103 for (
int j = 0; j < dim; j++) {
104 dprintf(
"%5d ", means[i * dim + j]);
115 assert(s != 0 && s[0] != 0);
116 cc = sscanf(s,
"%d%c", &v, &gomi);
130 num_points = DEF_NUM_POINTS;
131 num_means = DEF_NUM_MEANS;
133 grid_size = DEF_GRID_SIZE;
135 while ((c = getopt(argc, argv,
"d:c:p:s:")) != EOF) {
150 printf(
"Usage: %s -d <vector dimension> -c <num clusters> -p <num points> -s <max value>\n", argv[0]);
155 if (dim <= 0 || num_means <= 0 || num_points <= 0 || grid_size <= 0) {
156 printf(
"Illegal argument value. All values must be numeric and greater than 0\n");
160 printf(
"Dimension = %d\n", dim);
161 printf(
"Number of clusters = %d\n", num_means);
162 printf(
"Number of points = %d\n", num_points);
163 printf(
"Size of each dimension = %d\n", grid_size);
170 assert(kvs == 0 && kv.klen == 0 && kv.vlen == 0);
171 assert(kvo->c.mr->rank == 0);
174 snprintf(v, 256,
"%d", num_points);
176 snprintf(v, 256,
"%d", num_means);
178 snprintf(v, 256,
"%d", dim);
180 snprintf(v, 256,
"%d", grid_size);
184 int klen0 = (int)(strlen(k0) + 1);
186 ((
int)
sizeof(
int) * num_points * dim));
189 int klen1 = (int)(strlen(k1) + 1);
191 ((
int)
sizeof(
int) * num_means * dim));
202 assert(cc == MPI_SUCCESS);
205 assert(cc == MPI_SUCCESS);
208 assert(cc == MPI_SUCCESS);
211 assert(cc == MPI_SUCCESS);
222 assert(cc == MPI_SUCCESS);
223 points = malloc(
sizeof(
int) * (
size_t)(num_points * dim));
225 memcpy(points, s, (
sizeof(
int) * (
size_t)(num_points * dim)));
228 assert(cc == MPI_SUCCESS);
229 means = malloc(
sizeof(
int) * (
size_t)(num_means * dim));
231 memcpy(means, s, (
sizeof(
int) * (
size_t)(num_means * dim)));
237 generate_points(
int *pts,
int size)
239 for (
int i = 0; i < size; i++) {
240 for (
int j = 0; j < dim; j++) {
241 pts[i * dim + j] = (rand() % grid_size);
248 static inline unsigned int 249 get_sq_dist(
int *v1,
int *v2)
253 for (
int i = 0; i < dim; i++) {
256 sum += (
unsigned int)((p1 - p2) * (p1 - p2));
264 add_to_sum(
int *sum,
int *point)
266 for (
int i = 0; i < dim; i++) {
277 kmeans_splitter(
int units,
int id,
KMR_KVS *kvo)
280 assert(points != 0 && means != 0 && clusters != 0);
281 assert(units > 0 && kvo != 0);
283 if (next_point >= num_points) {
289 if ((next_point + units) < num_points) {
292 w = (num_points - next_point);
302 memcpy(partofclusters, &clusters[s], (
sizeof(
int) * (
size_t)w));
303 struct kmr_kv_box kv = {.klen = (int)
sizeof(
long),
316 assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
317 const int DEFAULT_CACHE_SIZE = (64 * 1024);
318 int chunk_size = (DEFAULT_CACHE_SIZE / unit_size);
321 while (kmeans_splitter(chunk_size,
id, kvo)) {
331 #define LOOSE_CAST(X) ((void *)(X)) 337 find_clusters(
int start,
int length,
int *partofclusters,
KMR_KVS *kvo)
340 for (
int i = 0; i < length; i++) {
342 unsigned int mindist = get_sq_dist(&points[p * dim], &means[0]);
344 for (
int j = 1; j < num_means; j++) {
345 unsigned int d = get_sq_dist(&points[p * dim], &means[j * dim]);
351 if (partofclusters[i] != minidx) {
352 partofclusters[i] = minidx;
355 struct kmr_kv_box nkv = {.klen = (int)
sizeof(
long),
356 .vlen = (int)(
sizeof(
int) * (size_t)dim),
358 .v.p = (
const char *)(&points[p * dim])};
372 assert((
size_t)kv.klen ==
sizeof(
long)
376 find_clusters(md->start, md->length, partofclusters, kvo);
379 .vlen = (int)(
sizeof(
int) * (size_t)md->length),
381 .v.p = (
void *)partofclusters};
390 reduce_to_average(
const struct kmr_kv_box kv[],
const long n,
394 long meanindex = kv[0].k.i;
396 for (
int i = 0; i < dim; i++) {
399 for (
int i = 0; i < n; i++) {
400 assert((
size_t)kv[i].vlen == (
sizeof(
int) * (
size_t)dim));
401 assert(kv[0].k.i == meanindex);
402 int *point = (
int *)kv[i].v.p;
403 add_to_sum(ave, point);
405 for (
int i = 0; i < dim; i++) {
406 ave[i] = (int)(ave[i] / n);
408 struct kmr_kv_box nkv = {.klen = (int)
sizeof(
long),
409 .vlen = (int)(
sizeof(
int) * (size_t)dim),
422 assert((
size_t)kv.klen ==
sizeof(
long)
423 && (
size_t)kv.vlen == (
sizeof(
int) * (
size_t)dim));
424 long meanindex = kv.k.i;
425 int *point = (
void *)kv.v.p;
426 int *m = &means[meanindex * dim];
427 for (
int i = 0; i < dim; i++) {
439 && (
size_t)kv.vlen == (
sizeof(
int) * (
size_t)md->length));
440 int *partofclusters = (
void *)kv.v.p;
441 for (
int i = 0; i < md->length; i++) {
442 clusters[md->start + i] = partofclusters[i];
451 check_modified(
const struct kmr_kv_box kv[],
const long n,
457 for (
int i = 0; i < n; i++) {
458 assert(kv[i].k.i == 0);
466 main(
int argc,
char **argv)
470 int nprocs, rank, thlv;
471 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
473 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
474 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
478 mr->preset_block_size = (6 * 1024 * 1024);
480 struct timeval begin, end;
482 unsigned int library_time = 0;
483 unsigned int inter_library_time = 0;
494 points = malloc(
sizeof(
int) * (
size_t)(num_points * dim));
496 generate_points(points, num_points);
498 means = malloc(
sizeof(
int) * (
size_t)(num_means * dim));
500 generate_points(means, num_means);
510 unit_size = (int)(
sizeof(
int) * (size_t)dim);
512 clusters = malloc(
sizeof(
int) * (
size_t)num_points);
513 assert(clusters != 0);
514 memset(clusters, -1, (
sizeof(
int) * (
size_t)num_points));
522 memset(&map_reduce_args, 0,
sizeof(map_reduce_args_t));
523 map_reduce_args.task_data = &kmeans_data;
524 map_reduce_args.map = kmeans_map;
525 map_reduce_args.reduce = kmeans_reduce;
526 map_reduce_args.splitter = kmeans_splitter;
527 map_reduce_args.locator = kmeans_locator;
528 map_reduce_args.key_cmp = mykeycmp;
529 map_reduce_args.unit_size = kmeans_data.unit_size;
530 map_reduce_args.partition = NULL;
531 map_reduce_args.result = &kmeans_vals;
532 map_reduce_args.data_size = (num_points + num_means) * dim *
sizeof(
int);
533 map_reduce_args.L1_cache_size = atoi(GETENV(
"MR_L1CACHESIZE"));
534 map_reduce_args.num_map_threads = atoi(GETENV(
"MR_NUMTHREADS"));
535 map_reduce_args.num_reduce_threads = atoi(GETENV(
"MR_NUMTHREADS"));
536 map_reduce_args.num_merge_threads = atoi(GETENV(
"MR_NUMTHREADS"));
537 map_reduce_args.num_procs = atoi(GETENV(
"MR_NUMPROCS"));
538 map_reduce_args.key_match_factor = (float)atof(GETENV(
"MR_KEYMATCHFACTOR"));
539 map_reduce_args.use_one_queue_per_task =
true;
543 dprintf(
"KMeans: Calling MapReduce Scheduler\n");
549 fprintf(stderr,
"initialize: %u\n", time_diff(&end, &begin));
552 while (modified > 0) {
554 dprintf(
"number of modified cluster points=%d\n", modified);
571 kmr_map(kvs1, kvs2, (
void *)kvs3, kmr_noopt, kmeans_map);
578 kmr_reduce(kvs4, kvs5, 0, kmr_noopt, reduce_to_average);
584 kmr_map(kvs6, 0, 0, kmr_noopt, store_means);
588 struct kmr_option rankzero = {.rank_zero = 1};
591 kmr_map(kvs7, 0, 0, kmr_noopt, store_clusters);
597 .vlen =
sizeof(
long),
604 kmr_reduce(kvs9, 0, 0, kmr_noopt, check_modified);
609 library_time += time_diff(&end, &begin);
613 inter_library_time += time_diff(&end, &end);
619 fprintf(stderr,
"library: %u\n", library_time);
620 fprintf(stderr,
"inter library: %u\n", inter_library_time);
630 printf(
"KMeans: MapReduce Completed\n");
632 dprintf(
"\n\nFinal means:\n");
633 dump_means(means, num_means);
637 dprintf(
"Cleaning up\n");
650 fprintf(stderr,
"finalize: %u\n", time_diff(&end, &begin));
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int kmr_find_string(KMR_KVS *kvi, const char *k, const char **vq)
Finds the key K in the key-value stream KVS.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
static int safe_atoi(const char *s)
Parses an integer string.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
static void parse_args(char *, char *[])
Parses command parameters given for mapper and reducer arguments.
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).