KMR
kmrshuffler.c
Go to the documentation of this file.
1 /* kmrshuller.c */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmrshuffler.c KMR-Shell Shuffler. It is a processor for
5  map-reduce by shell command pipelining ("streaming"). It works on
6  stdin and stdout, with one line for one key-value pair. It reads
7  lines of key-value pairs from stdin, shuffles the pairs, and
8  writes lines of key-value pairs to stdout. The fields of a
9  key-value pair are separated by a whitespace. Lines with the same
10  keys constitutes consecutive lines in the output for reduction.
11  It is a simple application. Lines are limited to 32K bytes and
12  have no escaping of whitespaces in keys. */
13 
14 #include <mpi.h>
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include "kmr.h"
19 
20 /** Maximum length of a line of data. */
21 #define LINELEN 32767
22 
23 /** Reads-in key-value lines from stdin into KVS. It reads a line
24  from mapper output, and sets it as a key-value pair in KVS. */
25 
26 static int
27 streaminputfn(const struct kmr_kv_box kv0,
28  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
29 {
30  char line[LINELEN];
31  long missingnl = 0;
32  long badlines = 0;
33  assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
34  while (fgets(line, sizeof(line), stdin) != NULL) {
35  int cc;
36  char *cp0 = strchr(line, '\n');
37  if (cp0 != NULL) {
38  /* Chomp. */
39  assert(*cp0 == '\n');
40  *cp0 = '\0';
41  } else {
42  missingnl++;
43  }
44  char *cp1 = strchr(line, ' ');
45  if (cp1 == NULL) {
46  /* No value field. */
47  badlines++;
48  continue;
49  }
50  *cp1 = '\0';
51  char *key = line;
52  char *value = (cp1 + 1);
53  struct kmr_kv_box kv;
54  kv.klen = (int)strlen(key) + 1;
55  kv.vlen = (int)strlen(value) + 1;
56  kv.k.p = key;
57  kv.v.p = value;
58  cc = kmr_add_kv(kvo, kv);
59  assert(cc == MPI_SUCCESS);
60  }
61  if (missingnl) {
62  fprintf(stderr, ("kmrshuller: warning: "
63  "Line too long or missing last newline.\n"));
64  }
65  if (badlines) {
66  fprintf(stderr, ("kmrshuller: warning: "
67  "Some lines have no pairs (ignored).\n"));
68  }
69  return MPI_SUCCESS;
70 }
71 
72 /** Writes-out key-value pairs in KVS to stdout. */
73 
74 static int
75 streamoutputfn(const struct kmr_kv_box kv[], const long n,
76  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
77 {
78  long *badlines = p;
79  char *cp1 = strchr(kv[0].k.p, ' ');
80  if (cp1 != NULL) {
81  (*badlines)++;
82  }
83  for (long i = 0; i < n; i++) {
84  printf("%s %s\n", kv[i].k.p, kv[i].v.p);
85  }
86  return MPI_SUCCESS;
87 }
88 
89 /** Runs KMR shuffler for streaming map-reduce. It reads maped data
90  from stdin, shuffles, and prints shuffled data to stdout. */
91 
92 int
93 main(int argc, char *argv[])
94 {
95  int nprocs, rank, thlv;
96  int cc;
97 
98  struct kmr_option opt_nothreading;
99  opt_nothreading.nothreading = 1;
100 
101  /*MPI_Init(&argc, &argv);*/
102  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
103  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
104  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
105  kmr_init();
106 
107  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
108  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
109 
110  /* Read mapper output. */
111 
112  cc = kmr_map_once(kvs0, 0, opt_nothreading, 0, streaminputfn);
113  assert(cc == MPI_SUCCESS);
114 
115  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
116  cc = kmr_shuffle(kvs0, kvs1, kmr_noopt);
117  assert(cc == MPI_SUCCESS);
118 
119  /* Output pairs to reducer. */
120 
121  long badlines = 0;
122  cc = kmr_reduce(kvs1, 0, &badlines, opt_nothreading, streamoutputfn);
123  assert(cc == MPI_SUCCESS);
124  if (badlines > 0) {
125  fprintf(stderr, ("kmrshuller: warning: "
126  "Some keys have whitespaces (ignored).\n"));
127  }
128 
129  kmr_free_context(mr);
130  kmr_fin();
131  MPI_Finalize();
132  return 0;
133 }
134 
135 /*
136 Copyright (C) 2012-2016 RIKEN AICS
137 This library is distributed WITHOUT ANY WARRANTY. This library can be
138 redistributed and/or modified under the terms of the BSD 2-Clause License.
139 */
Key-Value Stream (abstract).
Definition: kmr.h:587
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1402
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
KMR Context.
Definition: kmr.h:222
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
static int streaminputfn(const struct kmr_kv_box kv0, const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
Reads-in key-value lines from stdin into KVS.
Definition: kmrshuffler.c:27
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int main(int argc, char *argv[])
Runs KMR shuffler for streaming map-reduce.
Definition: kmrshuffler.c:93
#define LINELEN
Maximum length of a line of data.
Definition: kmrshuffler.c:21
static int streamoutputfn(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
Writes-out key-value pairs in KVS to stdout.
Definition: kmrshuffler.c:75
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147