KMR
test2.c
1 /* test2.c (2014-02-04) */
2 
3 /* Check memory leakage by displaying "ps". */
4 
5 /* Run it with "mpirun -np n a.out", and watch output. */
6 
7 #include <mpi.h>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <sys/time.h>
12 #include "kmr.h"
13 
14 static double
15 wtime()
16 {
17  static struct timeval tv0 = {.tv_sec = 0};
18  struct timeval tv;
19  int cc;
20  cc = gettimeofday(&tv, 0);
21  assert(cc == 0);
22  if (tv0.tv_sec == 0) {
23  tv0 = tv;
24  assert(tv0.tv_sec != 0);
25  }
26  double dt = ((double)(tv.tv_sec - tv0.tv_sec)
27  + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
28  return dt;
29 }
30 
31 /* Puts 200 key-value pairs to output KVO. It is a map-function. It
32  runs only on rank0. Inputs (KV0 and KVS0) are dummy. */
33 
34 static int
35 addkeysfn(const struct kmr_kv_box kv0,
36  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long ind)
37 {
38  assert(kvs0 == 0 && kv0.klen == 0 && kv0.vlen == 0 && kvo != 0);
39  char k[80];
40  char v[80];
41  int cc;
42  for (int i = 0; i < 200; i++) {
43  snprintf(k, 80, "key%d", i);
44  snprintf(v, 80, "value%d", i);
45  struct kmr_kv_box kv = {
46  .klen = (int)(strlen(k) + 1),
47  .vlen = (int)(strlen(v) + 1),
48  .k.p = k,
49  .v.p = v
50  };
51  cc = kmr_add_kv(kvo, kv);
52  assert(cc == MPI_SUCCESS);
53  }
54  return MPI_SUCCESS;
55 }
56 
57 static int
58 replacevaluefn(const struct kmr_kv_box kv0,
59  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p,
60  const long i)
61 {
62  assert(kvs0 != 0 && kvo != 0);
63  int cc, x;
64  char gomi;
65  cc = sscanf((&((char *)kv0.k.p)[3]), "%d%c", &x, &gomi);
66  assert(cc == 1);
67  char v[80];
68  snprintf(v, 10, "newvalue%d", x);
69  struct kmr_kv_box kv = {.klen = kv0.klen,
70  .vlen = (int)(strlen(v) + 1),
71  .k.p = kv0.k.p,
72  .v.p = v
73  };
74  cc = kmr_add_kv(kvo, kv);
75  assert(cc == MPI_SUCCESS);
76  return MPI_SUCCESS;
77 }
78 
79 static int
80 emptyreducefn(const struct kmr_kv_box kv[], const long n,
81  const KMR_KVS *kvs, KMR_KVS *kvo, void *p)
82 {
83  return MPI_SUCCESS;
84 }
85 
86 /* Do KMR operations many times. */
87 
88 static void
89 simple0(int nprocs, int rank)
90 {
91  int cc;
92 
93  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
94 
95  double t0, t1;
96  t0 = wtime();
97 
98  for (int i = 0; i < 10000; i++) {
99 
100  /* Check timeout. */
101 
102  t1 = wtime();
103  KMR_KVS *to0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
104  if (rank == 0) {
105  struct kmr_kv_box kv = {
106  .klen = (int)sizeof(long),
107  .vlen = (int)sizeof(long),
108  .k.i = 0,
109  .v.i = ((t1 - t0) > 20.0)
110  };
111  cc = kmr_add_kv(to0, kv);
112  assert(cc == MPI_SUCCESS);
113  }
114  cc = kmr_add_kv_done(to0);
115  assert(cc == MPI_SUCCESS);
116  KMR_KVS *to1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
117  cc = kmr_replicate(to0, to1, kmr_noopt);
118  assert(cc == MPI_SUCCESS);
119  struct kmr_kv_box tok = {.klen = (int)sizeof(long), .k.p = 0,
120  .vlen = 0, .v.p = 0};
121  struct kmr_kv_box tov;
122  cc = kmr_find_key(to1, tok, &tov);
123  assert(cc == MPI_SUCCESS);
124  cc = kmr_free_kvs(to1);
125  assert(cc == MPI_SUCCESS);
126  if (tov.v.i) {
127  if (rank == 0) {
128  printf("loops %d\n", i);
129  }
130  break;
131  }
132 
133  /* Put some pairs. */
134 
135  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
136  cc = kmr_map_on_rank_zero(kvs0, 0, kmr_noopt, addkeysfn);
137  assert(cc == MPI_SUCCESS);
138 
139  /* Replicate pairs to all ranks. */
140 
141  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
142  cc = kmr_replicate(kvs0, kvs1, kmr_noopt);
143  assert(cc == MPI_SUCCESS);
144 
145  /* Map pairs. */
146 
147  KMR_KVS *kvs2 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
148  cc = kmr_map(kvs1, kvs2, 0, kmr_noopt, replacevaluefn);
149  assert(cc == MPI_SUCCESS);
150 
151  /* Collect pairs by theirs keys. */
152 
153  KMR_KVS *kvs3 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
154  cc = kmr_shuffle(kvs2, kvs3, kmr_noopt);
155  assert(cc == MPI_SUCCESS);
156 
157  /* Reduce collected pairs. */
158 
159  KMR_KVS *kvs4 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
160  cc = kmr_reduce(kvs3, kvs4, 0, kmr_noopt, emptyreducefn);
161  assert(cc == MPI_SUCCESS);
162 
163  cc = kmr_free_kvs(kvs4);
164  assert(cc == MPI_SUCCESS);
165  }
166 
167  cc = kmr_free_context(mr);
168  assert(cc == MPI_SUCCESS);
169 }
170 
171 int
172 main(int argc, char *argv[])
173 {
174  char cmd[256];
175  int pid = getpid();
176  int N = 8;
177 
178  int nprocs, rank, thlv;
179  /*MPI_Init(&argc, &argv);*/
180  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
181  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
182  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
183 
184  kmr_init();
185 
186  MPI_Barrier(MPI_COMM_WORLD);
187  if (rank == 0) {printf("Check leakage by observing heap size.\n");}
188  if (rank == 0) {printf("Watch VSZ changes (loops %d times)...\n", N);}
189  if (rank == 0) {printf("(Each loop will take approx. 20 sec).\n");}
190  fflush(0);
191  usleep(50 * 1000);
192  MPI_Barrier(MPI_COMM_WORLD);
193 
194  for (int i = 0; i < N; i++) {
195  simple0(nprocs, rank);
196 
197  MPI_Barrier(MPI_COMM_WORLD);
198  if (rank == 0) {
199  snprintf(cmd, sizeof(cmd), "ps l %d", pid);
200  system(cmd);
201  }
202  fflush(0);
203  }
204 
205  MPI_Barrier(MPI_COMM_WORLD);
206  if (rank == 0) printf("OK\n");
207  fflush(0);
208 
209  kmr_fin();
210 
211  MPI_Finalize();
212 
213  return 0;
214 }
Key-Value Stream (abstract).
Definition: kmr.h:587
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *vo)
Finds a key-value pair for a key.
Definition: kmrmoreops.c:43
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Definition: kmrbase.c:1456
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147