KMR
phoenix-matrix-multiply.c
1 /* Copyright (c) 2007-2009, Stanford University
2 * All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are met:
6 * * Redistributions of source code must retain the above copyright
7 * notice, this list of conditions and the following disclaimer.
8 * * Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
11 * * Neither the name of Stanford University nor the names of its
12 * contributors may be used to endorse or promote products derived from
13 * this software without specific prior written permission.
14 *
15 * THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 */
26 
27 /* This is an example in "test" directory in Phoenix-2.0.0, converted
28  for KMR (2013-04-24). REWRITTEN MUCH. */
29 
30 #include <mpi.h>
31 #include <stdio.h>
32 #include <strings.h>
33 #include <string.h>
34 #include <stddef.h>
35 #include <stdlib.h>
36 #include <unistd.h>
37 #include <assert.h>
38 #include <sys/mman.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 #include <ctype.h>
42 #include <time.h>
43 #include <inttypes.h>
44 #include <sys/time.h>
45 #if 0
46 #include "map_reduce.h"
47 #include "stddefines.h"
48 #endif
49 #include "kmr.h"
50 
51 /* KMR NOTE: (0) Matrix A is row-split, matrix B is replicated. (1)
52  Remove mmap code. (2) MM_DATA be made global. */
53 
54 #define MIN(X,Y) (((X) <= (Y)) ? (X) : (Y))
55 
56 /* ROW is the start and ROWS is the number of rows of a task. Matrix
57  A and C are only on rank0. */
58 
59 typedef struct {
60  int siz;
61  int row_block_length;
62  int unit_size;
63  int row_counter;
64  int *A;
65  int *B;
66  int *C;
67 } mm_data_t;
68 
69 mm_data_t mm_data;
70 
71 typedef struct {
72  int row;
73  int rows;
74  long id;
75  /* (part of rows of matrix A or C) */
76 } mm_mapdata_t;
77 
78 #define dprintf(...) fprintf(stdout, __VA_ARGS__)
79 
80 static inline void
81 get_time(struct timeval *tv)
82 {
83  gettimeofday(tv, 0);
84 }
85 
86 static int
87 mm_setup(const struct kmr_kv_box kv,
88  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, long i_)
89 {
90  assert((size_t)kv.klen == sizeof(mm_data_t));
91  /*memcpy(&mm_data, (void *)kv.k.p, sizeof(mm_data_t));*/
92  mm_data_t *mm = (void *)kv.k.p;
93  mm_data.siz = mm->siz;
94  mm_data.siz = mm->siz;
95  mm_data.row_block_length = mm->row_block_length;
96  mm_data.unit_size = mm->unit_size;
97  size_t datasz = (sizeof(int) * (size_t)(mm_data.siz * mm_data.siz));
98  assert((size_t)kv.vlen == datasz);
99  if (kvs0->c.mr->rank != 0) {
100  mm_data.B = malloc(sizeof(int) * (size_t)(mm_data.siz * mm_data.siz));
101  assert(mm_data.B != 0);
102  }
103  memcpy(mm_data.B, (void *)kv.v.p, datasz);
104  return MPI_SUCCESS;
105 }
106 
107 /* Assigns rows to each task. It runs on rank0. */
108 
109 static int
110 matrixmult_splitter(int units, int id, KMR_KVS *kvo)
111 {
112  assert(mm_data.siz >= 0
113  && mm_data.unit_size >= 0
114  && mm_data.row_block_length >= 0
115  && mm_data.A != 0 && mm_data.B != 0 && mm_data.C != 0);
116 
117  assert(mm_data.row_counter <= mm_data.siz);
118  if (mm_data.row_counter >= mm_data.siz) {
119  return 0;
120  }
121  int n = MIN(units, (mm_data.siz - mm_data.row_counter));
122  mm_mapdata_t md;
123  size_t datasz = (sizeof(int) * (size_t)(mm_data.siz * n));
124  int *partA = malloc(datasz);
125  assert(partA != 0);
126  md.row = mm_data.row_counter;
127  md.rows = n;
128  md.id = id;
129  mm_data.row_counter += n;
130  memcpy(partA, (mm_data.A + (mm_data.siz * md.row)), datasz);
131  struct kmr_kv_box nkv = {.klen = sizeof(mm_mapdata_t),
132  .vlen = (int)datasz,
133  .k.p = (const char *)&md,
134  .v.p = (const char *)partA};
135  kmr_add_kv(kvo, nkv);
136  return 1;
137 }
138 
139 static int
140 phoenix_split(const struct kmr_kv_box kv0,
141  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, long i_)
142 {
143  assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
144  mm_data.row_counter = 0;
145  long id = 0;
146  while (matrixmult_splitter(mm_data.unit_size, (int)id, kvo)) {
147  id++;
148  }
149  return MPI_SUCCESS;
150 }
151 
152 /* Multiplies the allocated regions of matrix to compute partial sums. */
153 
154 static int
155 mm_map(const struct kmr_kv_box kv,
156  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, long i_)
157 {
158  mm_mapdata_t *md0 = (mm_mapdata_t *)kv.k.p;
159  int *partA = (void *)kv.v.p;
160  int n = md0->rows;
161  size_t datasz = (sizeof(int) * (size_t)(mm_data.siz * n));
162  int *partC = malloc(datasz);
163  assert(partC != 0);
164  for (int r = 0; r < md0->rows; r++) {
165  int *a = (partA + (r * mm_data.siz));
166  for (int i = 0; i < mm_data.siz ; i++) {
167  int *b = (mm_data.B + i);
168  int value = 0;
169  for (int j = 0; j < mm_data.siz; j++) {
170  value += (a[j] * b[0]);
171  b += mm_data.siz;
172  }
173  partC[r * mm_data.siz + i] = value;
174  }
175  }
176  mm_mapdata_t md1;
177  md1.row = md0->row;
178  md1.rows = n;
179  md1.id = md0->id;
180  struct kmr_kv_box nkv = {.klen = sizeof(mm_mapdata_t),
181  .vlen = (int)datasz,
182  .k.p = (const char *)&md1,
183  .v.p = (const char *)partC};
184  kmr_add_kv(kvo, nkv);
185  return MPI_SUCCESS;
186 }
187 
188 static int
189 mm_output(const struct kmr_kv_box kv,
190  const KMR_KVS *kvs, KMR_KVS *kvo, void *p, long i_)
191 {
192  mm_mapdata_t *md0 = (mm_mapdata_t *)kv.k.p;
193  int *partC = (void *)kv.v.p;
194  int n = md0->rows;
195  size_t datasz = (sizeof(int) * (size_t)(mm_data.siz * n));
196  memcpy((mm_data.C + (mm_data.siz * md0->row)), partC, datasz);
197  return MPI_SUCCESS;
198 }
199 
200 int main(int argc, char *argv[])
201 {
202  off_t data_size;
203  int cc;
204 
205  struct timeval begin, end;
206 
207  srand(20120930);
208 
209  int nprocs, rank, thlv;
210  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
211  /*MPI_Init(&argc, &argv);*/
212  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
213  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
214  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
215 
216  get_time(&begin);
217 
218  if (rank == 0) {
219  if (argc <= 1 || argv[1] == 0) {
220  dprintf("USAGE: %s matrix-size [row-block-size] [no-create-files]\n",
221  argv[0]);
222  MPI_Abort(MPI_COMM_WORLD, 1);
223  }
224 
225  char *fnameA = "matrix_file_A.txt";
226  char *fnameB = "matrix_file_B.txt";
227  int matrix_len = atoi(argv[1]);
228  assert(matrix_len > 0);
229  size_t file_size = (sizeof(int) * (size_t)(matrix_len * matrix_len));
230  fprintf(stderr, "***** file size is %ld\n", file_size);
231 
232  int row_block_length;
233  if (argv[2] == 0) {
234  row_block_length = 1;
235  } else {
236  row_block_length = atoi(argv[2]);
237  assert(row_block_length > 0);
238  }
239  int create_files;
240  if (argv[3] != 0) {
241  create_files = 1;
242  } else {
243  create_files = 0;
244  }
245  printf("MatrixMult: Side of the matrix is %d\n", matrix_len);
246  printf("MatrixMult: Row Block Len is %d\n", row_block_length);
247  printf("MatrixMult: Running...\n");
248 
249  /* If the matrix files do not exist, create and fill them. */
250 
251  if (create_files) {
252  dprintf("Creating files\n");
253  int value = 0;
254  FILE *fileA = fopen(fnameA, "w");
255  assert(fileA != 0);
256  FILE *fileB = fopen(fnameB, "w");
257  assert(fileB != 0);
258  for (int i = 0; i < matrix_len; i++) {
259  for (int j = 0; j < matrix_len; j++) {
260  value = (rand() % 11);
261  size_t cx = fwrite(&value, sizeof(int), 1, fileA);
262  assert(cx == 1);
263  }
264  }
265  for (int i = 0; i < matrix_len; i++) {
266  for (int j = 0; j < matrix_len; j++) {
267  value = (rand() % 11);
268  size_t cx = fwrite(&value, sizeof(int), 1, fileB);
269  assert(cx == 1);
270  }
271  }
272  fclose(fileA);
273  fclose(fileB);
274  }
275 
276  /* Read in the files. */
277 
278  struct stat finfoA, finfoB;
279 
280  int fdA = open(fnameA, O_RDONLY);
281  assert(fdA >= 0);
282  cc = fstat(fdA, &finfoA);
283  assert(cc == 0);
284  int *fdataA = malloc(file_size);
285  assert(fdataA != 0);
286  ssize_t cx0 = read(fdA, fdataA, file_size);
287  assert(cx0 == (ssize_t)file_size);
288 
289  int fdB = open(fnameB, O_RDONLY);
290  assert(fdB >= 0);
291  cc = fstat(fdB, &finfoB);
292  assert(cc == 0);
293  int *fdataB = malloc(file_size);
294  assert(fdataB != 0);
295  ssize_t cx1 = read(fdB, fdataB, file_size);
296  assert(cx1 == (ssize_t)file_size);
297 
298  mm_data.siz = matrix_len;
299  mm_data.row_block_length = row_block_length;
300  mm_data.unit_size = (int)(sizeof(int)
301  * (size_t)(row_block_length * matrix_len));
302  mm_data.A = (int *)fdataA;
303  mm_data.B = (int *)fdataB;
304  mm_data.C = malloc(sizeof(int) * (size_t)(mm_data.siz * mm_data.siz));
305  assert(mm_data.C != 0);
306 
307  close(fdA);
308  close(fdB);
309 
310  data_size = (off_t)file_size;
311  }
312 
313  /*map_reduce_init());*/
314 
315 #if 0
316  // Setup map reduce args
317  map_reduce_args_t map_reduce_args;
318  memset(&map_reduce_args, 0, sizeof(map_reduce_args_t));
319  map_reduce_args.task_data = &mm_data;
320  map_reduce_args.map = mm_map;
321  map_reduce_args.reduce = NULL;
322  map_reduce_args.splitter = matrixmult_splitter;
323  map_reduce_args.locator = matrixmult_locator;
324  map_reduce_args.key_cmp = myintcmp;
325  map_reduce_args.unit_size = mm_data.unit_size;
326  map_reduce_args.partition = NULL; // use default
327  map_reduce_args.result = &mm_vals;
328  map_reduce_args.data_size = file_size;
329  map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 8;
330  map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
331  map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16;
332  map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
333  map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16;
334  map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2;
335 #endif
336 
337  /* Enlarge element size of key-value pairs. */
338 
339  KMR_KVS *siz0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
340  if (rank == 0) {
341  struct kmr_kv_box nkv = {.klen = sizeof(long),
342  .vlen = sizeof(long),
343  .k.i = 0,
344  .v.i = mm_data.siz};
345  kmr_add_kv(siz0, nkv);
346  }
347  kmr_add_kv_done(siz0);
348  KMR_KVS *siz1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
349  kmr_replicate(siz0, siz1, kmr_noopt);
350  struct kmr_kv_box kv0 = {.klen = sizeof(long), .k.i = 0};
351  struct kmr_kv_box kv1;
352  cc = kmr_find_key(siz1, kv0, &kv1);
353  assert(cc == MPI_SUCCESS);
354  if (rank == 0) {
355  assert(mm_data.siz == kv1.v.i);
356  }
357  mm_data.siz = (int)kv1.v.i;
358  kmr_free_kvs(siz1);
359  mr->preset_block_size = (5 * sizeof(int) * (size_t)(mm_data.siz * mm_data.siz));
360 
361  if (rank == 0) {
362  fprintf(stderr, "***** data size is %" PRIdPTR "\n", (intptr_t)data_size);
363  printf("MatrixMult: Calling MapReduce Scheduler Matrix Multiplication\n");
364  }
365 
366  get_time(&end);
367 
368 #ifdef TIMING
369  if (rank == 0) {
370  fprintf(stderr, "initialize: %u\n", time_diff(&end, &begin));
371  }
372 #endif
373 
374  get_time(&begin);
375  /*map_reduce(&map_reduce_args);*/
376 
377  KMR_KVS *cnf0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
378  if (rank == 0) {
379  size_t datasz = (sizeof(int) * (size_t)(mm_data.siz * mm_data.siz));
380  struct kmr_kv_box nkv = {.klen = sizeof(mm_data_t),
381  .vlen = (int)datasz,
382  .k.p = (const char *)&mm_data,
383  .v.p = (const char *)mm_data.B};
384  kmr_add_kv(cnf0, nkv);
385  }
386  kmr_add_kv_done(cnf0);
387  KMR_KVS *cnf1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
388  kmr_replicate(cnf0, cnf1, kmr_noopt);
389  kmr_map(cnf1, 0, 0, kmr_noopt, mm_setup);
390 
391  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
392  kmr_map_on_rank_zero(kvs0, 0, kmr_noopt, phoenix_split);
393  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
394  kmr_shuffle(kvs0, kvs1, kmr_noopt);
395 
396  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
397  kmr_map(kvs1, kvs2, 0, kmr_noopt, mm_map);
398 
399  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
400  struct kmr_option rankzero = {.rank_zero = 1};
401  kmr_replicate(kvs2, kvs3, rankzero);
402  kmr_map(kvs3, 0, 0, kmr_noopt, mm_output);
403 
404  get_time(&end);
405 
406 #ifdef TIMING
407  if (rank == 0) {
408  fprintf(stderr, "library: %u\n", time_diff(&end, &begin));
409  }
410 #endif
411 
412  get_time(&begin);
413 
414  /*map_reduce_finalize();*/
415 
416  //dprintf("\n");
417  //dprintf("The length of the final output is %d\n",mm_vals.length);
418 
419  if (rank == 0) {
420  int sum = 0;
421  for (int i = 0; i < (mm_data.siz * mm_data.siz); i++) {
422  sum += mm_data.C[i];
423  }
424  dprintf("MatrixMult: total sum is %d\n", sum);
425  dprintf("MatrixMult: MapReduce Completed\n");
426  }
427 
428  if (rank == 0) {
429  for (int i = 0; i < mm_data.siz; i++) {
430  for (int j = 0; j < mm_data.siz; j++) {
431  int v = 0;
432  for (int k = 0; k < mm_data.siz; k++) {
433  int a = mm_data.A[i * mm_data.siz + k];
434  int b = mm_data.B[k * mm_data.siz + j];
435  v += (a * b);
436  }
437  assert(mm_data.C[i * mm_data.siz + j] == v);
438  }
439  }
440  }
441 
442  if (rank == 0) {
443  free(mm_data.A);
444  free(mm_data.C);
445  }
446  free(mm_data.B);
447 
448  kmr_free_context(mr);
449 
450  get_time(&end);
451 
452 #ifdef TIMING
453  if (rank == 0) {
454  fprintf(stderr, "finalize: %u\n", time_diff(&end, &begin));
455  }
456 #endif
457 
458  MPI_Finalize();
459  return 0;
460 }
Key-Value Stream (abstract).
Definition: kmr.h:587
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *vo)
Finds a key-value pair for a key.
Definition: kmrmoreops.c:43
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1456
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147