30 #include "mapreduce.h" 43 int sum(
const struct kmr_kv_box kv[],
const long n,
55 int main(
int argc,
char **argv)
59 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
60 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
61 MPI_Comm_rank(MPI_COMM_WORLD, &me);
66 if (me == 0) printf(
"Syntax: wordfreq file1 file2 ...\n");
67 MPI_Abort(MPI_COMM_WORLD, 1);
74 MPI_Barrier(MPI_COMM_WORLD);
75 double tstart = MPI_Wtime();
79 struct kmr_option inspect = {.nothreading = 1, .inspect = 1};
80 struct kmr_option nothreading = {.nothreading = 1};
91 kvs0, 0, kmr_noopt, fileread);
92 assert(cc == MPI_SUCCESS);
100 assert(cc == MPI_SUCCESS);
104 cc =
kmr_reduce(kvs0, kvs1, 0, kmr_noopt, sum);
105 assert(cc == MPI_SUCCESS);
110 assert(cc == MPI_SUCCESS);
112 cc =
kmr_reduce(kvs2, kvs3, 0, kmr_noopt, sum);
113 assert(cc == MPI_SUCCESS);
117 assert(cc == MPI_SUCCESS);
119 MPI_Barrier(MPI_COMM_WORLD);
120 double tstop = MPI_Wtime();
124 cc =
kmr_sort(kvs3, kvs4, kmr_noopt);
125 assert(cc == MPI_SUCCESS);
134 cc =
kmr_map(kvs4, kvs5, &count, nothreading, output);
135 assert(cc == MPI_SUCCESS);
140 assert(cc == MPI_SUCCESS);
144 cc =
kmr_sort(kvs6, kvs7, kmr_noopt);
145 assert(cc == MPI_SUCCESS);
152 cc =
kmr_map(kvs7, 0, &count, inspect, output);
153 assert(cc == MPI_SUCCESS);
161 printf(
"%ld total words, %ld unique words\n", nwords, nunique);
162 printf(
"Time to process %d files on %d procs = %g (secs)\n",
163 nfiles, nprocs, tstop - tstart);
183 const char *fname = kv.k.p;
185 int flag = stat(fname, &stbuf);
187 printf(
"ERROR: Could not query file size\n");
188 MPI_Abort(MPI_COMM_WORLD, 1);
190 size_t filesize = (size_t)stbuf.st_size;
192 FILE *fp = fopen(fname,
"r");
193 char *text = malloc(filesize + 1);
195 printf(
"ERROR: malloc failed\n");
196 MPI_Abort(MPI_COMM_WORLD, 1);
198 size_t nchar = fread(text, 1, filesize, fp);
199 if (nchar != filesize) {
200 printf(
"ERROR: fread returned in the middle\n");
201 MPI_Abort(MPI_COMM_WORLD, 1);
206 char *whitespace =
" .,;()<>-/0123456789\"\014\t\n\f\r\0";
207 char *word = strtok(text, whitespace);
210 int len = (int)(strlen(word) + 1);
212 .vlen =
sizeof(long),
216 assert(cc == MPI_SUCCESS);
217 word = strtok(NULL, whitespace);
232 int sum(
const struct kmr_kv_box kv[],
const long n,
238 for (
long i = 0; i < n; i++) {
242 .vlen =
sizeof(long),
246 assert(cc == MPI_SUCCESS);
256 int ncompare(
char *p1,
int len1,
char *p2,
int len2)
258 int i1 = *(
int *) p1;
259 int i2 = *(
int *) p2;
260 if (i1 > i2)
return -1;
261 else if (i1 < i2)
return 1;
281 if (count->n > count->limit) {
287 printf(
"%d %s\n", n, kv.k.p);
291 .vlen =
sizeof(long),
295 assert(cc == MPI_SUCCESS);
Key-Value Stream (abstract).
int kmr_map_file_names(KMR *mr, char **names, int n, struct kmr_file_option fopt, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on file names.
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
#define kmr_init()
Sets up the environment.
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Options to Mapping on Files.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).