46 #include "map_reduce.h" 47 #include "stddefines.h" 54 #define MIN(X,Y) (((X) <= (Y)) ? (X) : (Y)) 78 #define dprintf(...) fprintf(stdout, __VA_ARGS__) 81 get_time(
struct timeval *tv)
90 assert((
size_t)kv.klen ==
sizeof(
mm_data_t));
93 mm_data.siz = mm->siz;
94 mm_data.siz = mm->siz;
95 mm_data.row_block_length = mm->row_block_length;
96 mm_data.unit_size = mm->unit_size;
97 size_t datasz = (
sizeof(
int) * (size_t)(mm_data.siz * mm_data.siz));
98 assert((
size_t)kv.vlen == datasz);
99 if (kvs0->c.mr->rank != 0) {
100 mm_data.B = malloc(
sizeof(
int) * (
size_t)(mm_data.siz * mm_data.siz));
101 assert(mm_data.B != 0);
103 memcpy(mm_data.B, (
void *)kv.v.p, datasz);
110 matrixmult_splitter(
int units,
int id,
KMR_KVS *kvo)
112 assert(mm_data.siz >= 0
113 && mm_data.unit_size >= 0
114 && mm_data.row_block_length >= 0
115 && mm_data.A != 0 && mm_data.B != 0 && mm_data.C != 0);
117 assert(mm_data.row_counter <= mm_data.siz);
118 if (mm_data.row_counter >= mm_data.siz) {
121 int n = MIN(units, (mm_data.siz - mm_data.row_counter));
123 size_t datasz = (
sizeof(int) * (
size_t)(mm_data.siz * n));
124 int *partA = malloc(datasz);
126 md.row = mm_data.row_counter;
129 mm_data.row_counter += n;
130 memcpy(partA, (mm_data.A + (mm_data.siz * md.row)), datasz);
133 .k.p = (
const char *)&md,
134 .v.p = (
const char *)partA};
143 assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
144 mm_data.row_counter = 0;
146 while (matrixmult_splitter(mm_data.unit_size, (
int)
id, kvo)) {
159 int *partA = (
void *)kv.v.p;
161 size_t datasz = (
sizeof(int) * (
size_t)(mm_data.siz * n));
162 int *partC = malloc(datasz);
164 for (
int r = 0; r < md0->rows; r++) {
165 int *a = (partA + (r * mm_data.siz));
166 for (
int i = 0; i < mm_data.siz ; i++) {
167 int *b = (mm_data.B + i);
169 for (
int j = 0; j < mm_data.siz; j++) {
170 value += (a[j] * b[0]);
173 partC[r * mm_data.siz + i] = value;
182 .k.p = (
const char *)&md1,
183 .v.p = (
const char *)partC};
193 int *partC = (
void *)kv.v.p;
195 size_t datasz = (
sizeof(int) * (
size_t)(mm_data.siz * n));
196 memcpy((mm_data.C + (mm_data.siz * md0->row)), partC, datasz);
200 int main(
int argc,
char *argv[])
205 struct timeval begin, end;
209 int nprocs, rank, thlv;
210 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
212 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
213 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
219 if (argc <= 1 || argv[1] == 0) {
220 dprintf(
"USAGE: %s matrix-size [row-block-size] [no-create-files]\n",
222 MPI_Abort(MPI_COMM_WORLD, 1);
225 char *fnameA =
"matrix_file_A.txt";
226 char *fnameB =
"matrix_file_B.txt";
227 int matrix_len = atoi(argv[1]);
228 assert(matrix_len > 0);
229 size_t file_size = (
sizeof(int) * (
size_t)(matrix_len * matrix_len));
230 fprintf(stderr,
"***** file size is %ld\n", file_size);
232 int row_block_length;
234 row_block_length = 1;
236 row_block_length = atoi(argv[2]);
237 assert(row_block_length > 0);
245 printf(
"MatrixMult: Side of the matrix is %d\n", matrix_len);
246 printf(
"MatrixMult: Row Block Len is %d\n", row_block_length);
247 printf(
"MatrixMult: Running...\n");
252 dprintf(
"Creating files\n");
254 FILE *fileA = fopen(fnameA,
"w");
256 FILE *fileB = fopen(fnameB,
"w");
258 for (
int i = 0; i < matrix_len; i++) {
259 for (
int j = 0; j < matrix_len; j++) {
260 value = (rand() % 11);
261 size_t cx = fwrite(&value,
sizeof(
int), 1, fileA);
265 for (
int i = 0; i < matrix_len; i++) {
266 for (
int j = 0; j < matrix_len; j++) {
267 value = (rand() % 11);
268 size_t cx = fwrite(&value,
sizeof(
int), 1, fileB);
278 struct stat finfoA, finfoB;
280 int fdA = open(fnameA, O_RDONLY);
282 cc = fstat(fdA, &finfoA);
284 int *fdataA = malloc(file_size);
286 ssize_t cx0 = read(fdA, fdataA, file_size);
287 assert(cx0 == (ssize_t)file_size);
289 int fdB = open(fnameB, O_RDONLY);
291 cc = fstat(fdB, &finfoB);
293 int *fdataB = malloc(file_size);
295 ssize_t cx1 = read(fdB, fdataB, file_size);
296 assert(cx1 == (ssize_t)file_size);
298 mm_data.siz = matrix_len;
299 mm_data.row_block_length = row_block_length;
300 mm_data.unit_size = (int)(
sizeof(
int)
301 * (size_t)(row_block_length * matrix_len));
302 mm_data.A = (
int *)fdataA;
303 mm_data.B = (
int *)fdataB;
304 mm_data.C = malloc(
sizeof(
int) * (
size_t)(mm_data.siz * mm_data.siz));
305 assert(mm_data.C != 0);
310 data_size = (off_t)file_size;
317 map_reduce_args_t map_reduce_args;
318 memset(&map_reduce_args, 0,
sizeof(map_reduce_args_t));
319 map_reduce_args.task_data = &mm_data;
320 map_reduce_args.map = mm_map;
321 map_reduce_args.reduce = NULL;
322 map_reduce_args.splitter = matrixmult_splitter;
323 map_reduce_args.locator = matrixmult_locator;
324 map_reduce_args.key_cmp = myintcmp;
325 map_reduce_args.unit_size = mm_data.unit_size;
326 map_reduce_args.partition = NULL;
327 map_reduce_args.result = &mm_vals;
328 map_reduce_args.data_size = file_size;
329 map_reduce_args.L1_cache_size = atoi(GETENV(
"MR_L1CACHESIZE"));
330 map_reduce_args.num_map_threads = atoi(GETENV(
"MR_NUMTHREADS"));
331 map_reduce_args.num_reduce_threads = atoi(GETENV(
"MR_NUMTHREADS"));
332 map_reduce_args.num_merge_threads = atoi(GETENV(
"MR_NUMTHREADS"));
333 map_reduce_args.num_procs = atoi(GETENV(
"MR_NUMPROCS"));
334 map_reduce_args.key_match_factor = (float)atof(GETENV(
"MR_KEYMATCHFACTOR"));
341 struct kmr_kv_box nkv = {.klen =
sizeof(long),
342 .vlen =
sizeof(
long),
350 struct kmr_kv_box kv0 = {.klen =
sizeof(long), .k.i = 0};
353 assert(cc == MPI_SUCCESS);
355 assert(mm_data.siz == kv1.v.i);
357 mm_data.siz = (int)kv1.v.i;
359 mr->preset_block_size = (5 *
sizeof(int) * (
size_t)(mm_data.siz * mm_data.siz));
362 fprintf(stderr,
"***** data size is %" PRIdPTR
"\n", (intptr_t)data_size);
363 printf(
"MatrixMult: Calling MapReduce Scheduler Matrix Multiplication\n");
370 fprintf(stderr,
"initialize: %u\n", time_diff(&end, &begin));
379 size_t datasz = (
sizeof(int) * (
size_t)(mm_data.siz * mm_data.siz));
382 .k.p = (
const char *)&mm_data,
383 .v.p = (
const char *)mm_data.B};
389 kmr_map(cnf1, 0, 0, kmr_noopt, mm_setup);
397 kmr_map(kvs1, kvs2, 0, kmr_noopt, mm_map);
400 struct kmr_option rankzero = {.rank_zero = 1};
402 kmr_map(kvs3, 0, 0, kmr_noopt, mm_output);
408 fprintf(stderr,
"library: %u\n", time_diff(&end, &begin));
421 for (
int i = 0; i < (mm_data.siz * mm_data.siz); i++) {
424 dprintf(
"MatrixMult: total sum is %d\n", sum);
425 dprintf(
"MatrixMult: MapReduce Completed\n");
429 for (
int i = 0; i < mm_data.siz; i++) {
430 for (
int j = 0; j < mm_data.siz; j++) {
432 for (
int k = 0; k < mm_data.siz; k++) {
433 int a = mm_data.A[i * mm_data.siz + k];
434 int b = mm_data.B[k * mm_data.siz + j];
437 assert(mm_data.C[i * mm_data.siz + j] == v);
454 fprintf(stderr,
"finalize: %u\n", time_diff(&end, &begin));
Key-Value Stream (abstract).
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *vo)
Finds a key-value pair for a key.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).