KMR
mrmpi-wordfreq.c
1 /* ----------------------------------------------------------------------
2  MR-MPI = MapReduce-MPI library
3  http://www.cs.sandia.gov/~sjplimp/mapreduce.html
4  Steve Plimpton, sjplimp@sandia.gov, Sandia National Laboratories
5 
6  Copyright (2009) Sandia Corporation. Under the terms of Contract
7  DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
8  certain rights in this software. This software is distributed under
9  the modified Berkeley Software Distribution (BSD) License.
10 
11  See the README file in the top-level MapReduce directory.
12 ------------------------------------------------------------------------- */
13 
14 /* This is an example in "examples" directory in MR-MPI (20Jun11),
15  converted for KMR (2013-04-24). */
16 
17 // MapReduce word frequency example in C++
18 // Syntax: wordfreq file1 dir1 file2 dir2 ...
19 // (1) read all files and files in dirs
20 // (2) parse into words separated by whitespace
21 // (3) count occurrence of each word in all files
22 // (4) print top 10 words
23 
24 #include "mpi.h"
25 #include "stdio.h"
26 #include "stdlib.h"
27 #include "string.h"
28 #include "sys/stat.h"
29 #if 0
30 #include "mapreduce.h"
31 #include "keyvalue.h"
32 #endif
33 #include "kmr.h"
34 
35 /*void fileread(int, char *, KMR_KVS *, void *);*/
36 /*void sum(char *, int, char *, int, int *, KMR_KVS *, void *);*/
37 /*int ncompare(char *, int, char *, int);*/
38 /*void output(uint64_t, char *, int, char *, int, KMR_KVS *, void *);*/
39 
40 int fileread(const struct kmr_kv_box kv,
41  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
42  const long index);
43 int sum(const struct kmr_kv_box kv[], const long n,
44  const KMR_KVS *kvi, KMR_KVS *kvo, void *p);
45 int output(const struct kmr_kv_box kv,
46  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
47  const long index);
48 
49 struct Count {
50  int n, limit, flag;
51 };
52 
53 /* ---------------------------------------------------------------------- */
54 
55 int main(int argc, char **argv)
56 {
57  /*MPI_Init(&argc, &argv);*/
58  int me, nprocs, thlv;
59  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
60  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
61  MPI_Comm_rank(MPI_COMM_WORLD, &me);
62 
63  kmr_init();
64 
65  if (argc <= 1) {
66  if (me == 0) printf("Syntax: wordfreq file1 file2 ...\n");
67  MPI_Abort(MPI_COMM_WORLD, 1);
68  }
69 
70  /*MapReduce *mr = new MapReduce(MPI_COMM_WORLD);*/
71  /*mr->timer = 1;*/
72  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
73 
74  MPI_Barrier(MPI_COMM_WORLD);
75  double tstart = MPI_Wtime();
76 
77  int cc;
78 
79  struct kmr_option inspect = {.nothreading = 1, .inspect = 1};
80  struct kmr_option nothreading = {.nothreading = 1};
81  struct kmr_option rankzero = {.rank_zero = 1};
82 
83  /*int nwords = mr->map((argc - 1), &argv[1], 0, 1, 0, fileread, NULL);*/
84  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
85  struct kmr_file_option fopt = {
86  .each_rank = 0,
87  .subdirectories = 1,
88  .list_file = 0,
89  .shuffle_names = 1};
90  cc = kmr_map_file_names(mr, &argv[1], (argc - 1), fopt,
91  kvs0, 0, kmr_noopt, fileread);
92  assert(cc == MPI_SUCCESS);
93 
94  /* KMR does not count the number of files mapped. */
95  int nfiles = 0;
96  /*int nfiles = mr->mapfilecount;*/
97 
98  long nwords;
99  cc = kmr_get_element_count(kvs0, &nwords);
100  assert(cc == MPI_SUCCESS);
101 
102  /*mr->collate(NULL);*/
103  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
104  cc = kmr_reduce(kvs0, kvs1, 0, kmr_noopt, sum);
105  assert(cc == MPI_SUCCESS);
106 
107  /*int nunique = mr->reduce(sum, NULL);*/
108  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
109  cc = kmr_shuffle(kvs1, kvs2, kmr_noopt);
110  assert(cc == MPI_SUCCESS);
111  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
112  cc = kmr_reduce(kvs2, kvs3, 0, kmr_noopt, sum);
113  assert(cc == MPI_SUCCESS);
114 
115  long nunique = 0;
116  cc = kmr_get_element_count(kvs3, &nunique);
117  assert(cc == MPI_SUCCESS);
118 
119  MPI_Barrier(MPI_COMM_WORLD);
120  double tstop = MPI_Wtime();
121 
122  /*mr->sort_values(&ncompare);*/
123  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
124  cc = kmr_sort(kvs3, kvs4, kmr_noopt);
125  assert(cc == MPI_SUCCESS);
126 
127  struct Count count;
128  count.n = 0;
129  count.limit = 10;
130  count.flag = 0;
131 
132  /*mr->map(mr, output, &count);*/
133  KMR_KVS *kvs5 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
134  cc = kmr_map(kvs4, kvs5, &count, nothreading, output);
135  assert(cc == MPI_SUCCESS);
136 
137  /*mr->gather(1);*/
138  KMR_KVS *kvs6 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
139  cc = kmr_replicate(kvs5, kvs6, rankzero);
140  assert(cc == MPI_SUCCESS);
141 
142  /*mr->sort_values(ncompare);*/
143  KMR_KVS *kvs7 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_INTEGER);
144  cc = kmr_sort(kvs6, kvs7, kmr_noopt);
145  assert(cc == MPI_SUCCESS);
146 
147  count.n = 0;
148  count.limit = 10;
149  count.flag = 1;
150 
151  /*mr->map(mr, output, &count);*/
152  cc = kmr_map(kvs7, 0, &count, inspect, output);
153  assert(cc == MPI_SUCCESS);
154 
155  kmr_free_kvs(kvs7);
156 
157  /*delete mr;*/
158  kmr_free_context(mr);
159 
160  if (me == 0) {
161  printf("%ld total words, %ld unique words\n", nwords, nunique);
162  printf("Time to process %d files on %d procs = %g (secs)\n",
163  nfiles, nprocs, tstop - tstart);
164  }
165 
166  kmr_fin();
167 
168  MPI_Finalize();
169 }
170 
171 /* ----------------------------------------------------------------------
172  read a file
173  for each word in file, emit key = word, value = NULL
174 ------------------------------------------------------------------------- */
175 
176 /*void fileread(int itask, char *fname, KeyValue *kv, void *ptr)*/
177 
178 int fileread(const struct kmr_kv_box kv,
179  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
180  const long index)
181 {
182  int cc;
183  const char *fname = kv.k.p;
184  struct stat stbuf;
185  int flag = stat(fname, &stbuf);
186  if (flag < 0) {
187  printf("ERROR: Could not query file size\n");
188  MPI_Abort(MPI_COMM_WORLD, 1);
189  }
190  size_t filesize = (size_t)stbuf.st_size;
191 
192  FILE *fp = fopen(fname, "r");
193  char *text = malloc(filesize + 1);
194  if (text == 0) {
195  printf("ERROR: malloc failed\n");
196  MPI_Abort(MPI_COMM_WORLD, 1);
197  }
198  size_t nchar = fread(text, 1, filesize, fp);
199  if (nchar != filesize) {
200  printf("ERROR: fread returned in the middle\n");
201  MPI_Abort(MPI_COMM_WORLD, 1);
202  }
203  text[nchar] = '\0';
204  fclose(fp);
205 
206  char *whitespace = " .,;()<>-/0123456789\"\014\t\n\f\r\0";
207  char *word = strtok(text, whitespace);
208  while (word != 0) {
209  /*kv->add(word, strlen(word)+1, NULL, 0);*/
210  int len = (int)(strlen(word) + 1);
211  struct kmr_kv_box akv = {.klen = len,
212  .vlen = sizeof(long),
213  .k.p = word,
214  .v.i = 1};
215  cc = kmr_add_kv(kvo, akv);
216  assert(cc == MPI_SUCCESS);
217  word = strtok(NULL, whitespace);
218  }
219 
220  free(text);
221  return MPI_SUCCESS;
222 }
223 
224 /* ----------------------------------------------------------------------
225  count word occurrence
226  emit key = word, value = # of multi-values
227 ------------------------------------------------------------------------- */
228 
229 /*void sum(char *key, int keybytes, char *multivalue,
230  int nvalues, int *valuebytes, KeyValue *kv, void *ptr)*/
231 
232 int sum(const struct kmr_kv_box kv[], const long n,
233  const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
234 {
235  /*kv->add(key, keybytes,(char *) &nvalues, sizeof(int));*/
236  int cc;
237  long cnt = 0;
238  for (long i = 0; i < n; i++) {
239  cnt += kv[i].v.i;
240  }
241  struct kmr_kv_box akv = {.klen = kv[0].klen,
242  .vlen = sizeof(long),
243  .k.p = kv[0].k.p,
244  .v.i = cnt};
245  cc = kmr_add_kv(kvo, akv);
246  assert(cc == MPI_SUCCESS);
247  return MPI_SUCCESS;
248 }
249 
250 /* ----------------------------------------------------------------------
251  compare two counts
252  order values by count, largest first
253 ------------------------------------------------------------------------- */
254 
255 #if 0
256 int ncompare(char *p1, int len1, char *p2, int len2)
257 {
258  int i1 = *(int *) p1;
259  int i2 = *(int *) p2;
260  if (i1 > i2) return -1;
261  else if (i1 < i2) return 1;
262  else return 0;
263 }
264 #endif
265 
266 /* ----------------------------------------------------------------------
267  process a word and its count
268  depending on flag, emit KV or print it, up to limit
269 ------------------------------------------------------------------------- */
270 
271 /*void output(uint64_t itask, char *key, int keybytes, char *value,
272  int valuebytes, KeyValue *kv, void *ptr)*/
273 
274 int output(const struct kmr_kv_box kv,
275  const KMR_KVS *kvi, KMR_KVS *kvo, void *p,
276  const long index)
277 {
278  int cc;
279  struct Count *count = (struct Count *)p;
280  count->n++;
281  if (count->n > count->limit) {
282  return MPI_SUCCESS;
283  }
284  /*int n = *(int *)value;*/
285  int n = (int)kv.v.i;
286  if (count->flag) {
287  printf("%d %s\n", n, kv.k.p);
288  } else {
289  /*kv->add(key, keybytes, (char *)&n, sizeof(int));*/
290  struct kmr_kv_box akv = {.klen = kv.klen,
291  .vlen = sizeof(long),
292  .k.p = kv.k.p,
293  .v.i = n};
294  cc = kmr_add_kv(kvo, akv);
295  assert(cc == MPI_SUCCESS);
296  }
297  return MPI_SUCCESS;
298 }
Key-Value Stream (abstract).
Definition: kmr.h:587
int kmr_map_file_names(KMR *mr, char **names, int n, struct kmr_file_option fopt, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on file names.
Definition: kmrfiles.c:1372
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
Definition: kmrmoreops.c:114
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
Definition: kmrmoreops.c:575
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
Options to Mapping on Files.
Definition: kmr.h:638
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147