33 assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
34 while (fgets(line,
sizeof(line), stdin) != NULL) {
36 char *cp0 = strchr(line,
'\n');
44 char *cp1 = strchr(line,
' ');
52 char *value = (cp1 + 1);
54 kv.klen = (int)strlen(key) + 1;
55 kv.vlen = (int)strlen(value) + 1;
59 assert(cc == MPI_SUCCESS);
62 fprintf(stderr, (
"kmrshuller: warning: " 63 "Line too long or missing last newline.\n"));
66 fprintf(stderr, (
"kmrshuller: warning: " 67 "Some lines have no pairs (ignored).\n"));
79 char *cp1 = strchr(kv[0].k.p,
' ');
83 for (
long i = 0; i < n; i++) {
84 printf(
"%s %s\n", kv[i].k.p, kv[i].v.p);
93 main(
int argc,
char *argv[])
95 int nprocs, rank, thlv;
99 opt_nothreading.nothreading = 1;
102 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
103 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
104 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
113 assert(cc == MPI_SUCCESS);
117 assert(cc == MPI_SUCCESS);
123 assert(cc == MPI_SUCCESS);
125 fprintf(stderr, (
"kmrshuller: warning: " 126 "Some keys have whitespaces (ignored).\n"));
Key-Value Stream (abstract).
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Handy Copy of a Key-Value Field.
int kmr_fin(void)
Clears the environment.
#define kmr_init()
Sets up the environment.
static int streaminputfn(const struct kmr_kv_box kv0, const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i_)
Reads-in key-value lines from stdin into KVS.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int main(int argc, char *argv[])
Runs KMR shuffler for streaming map-reduce.
#define LINELEN
Maximum length of a line of data.
static int streamoutputfn(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
Writes-out key-value pairs in KVS to stdout.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).