KMR
testiolb2.c
1 /*testiolb2 (2014-08-20) */
2 
3 /* A file read benchmark for testing locality-aware file assignment.
4 
5  This program reads files in a specified directory and measures the average
6  time for reading two files on each node.
7 
8  This program can be run as the follow.
9 
10  1. crate a directory and files
11 
12  $ mkdir ./data
13  $ dd if=/dev/zero of=./data/file000 bs=1M count=100
14  $ dd if=/dev/zero of=./data/file001 bs=1M count=100
15  ...
16 
17  2. run by mpiexec
18 
19  $ mpiexec ./a.out ./data
20 
21  When '-s' option is given, it performs shuffle instead of locality-
22  aware file assignment.
23 */
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <unistd.h>
31 #include <dirent.h>
32 #include <assert.h>
33 #include <mpi.h>
34 #include "kmr.h"
35 
36 #define PATHLEN 256
37 #define READBUFSIZ 1048576 /* 1MB */
38 #define MAXFILES 8192
39 
40 /* KMR map function
41  It creates a kvs whose key is a sequential number starts from 0 and value
42  is a file name in the specified directory.
43 */
44 static int
45 read_files(const struct kmr_kv_box kv,
46  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
47 {
48  char *dirname = (char *)p;
49 
50  /* Count files */
51  int file_cnt = 0;
52  DIR *dir = opendir(dirname);
53  assert(dir != NULL);
54  for (struct dirent *dp = readdir(dir); dp != NULL; dp = readdir(dir)) {
55  assert(strlen(dp->d_name) > 0);
56  if (dp->d_name[0] == '.') {
57  continue;
58  }
59  file_cnt += 1;
60  }
61  closedir(dir);
62 
63  /* Load file names */
64  char **files = (char **)malloc(sizeof(char *) * (size_t)file_cnt);
65  dir = opendir(dirname);
66  assert(dir != NULL);
67  int fidx = 0;
68  for (struct dirent *dp = readdir(dir); dp != NULL; dp = readdir(dir)) {
69  assert(strlen(dp->d_name) > 0);
70  if (dp->d_name[0] == '.') {
71  continue;
72  }
73  files[fidx] = (char *)malloc(sizeof(char) * PATHLEN);
74  snprintf(files[fidx], PATHLEN, "%s/%s", dirname, dp->d_name);
75  fidx += 1;
76  }
77  closedir(dir);
78 
79  /* Create kv */
80  for (int i = 0; i < file_cnt; i++) {
81  char *first = files[i];
82  char *second = files[(i + 1) % file_cnt];
83  size_t first_len = strlen(first);
84  size_t second_len = strlen(second);
85  long vlen = (long)first_len + (long)second_len + 2;
86  char *val = (char *)malloc(sizeof(char) * (size_t)vlen);
87  strncpy(val, first, first_len);
88  val[first_len] = '\0';
89  strncpy(&val[first_len + 1], second, second_len);
90  val[vlen - 1] = '\0';
91  struct kmr_kv_box nkv = { .klen = sizeof(long),
92  .vlen = (int)(vlen * (long)sizeof(char)),
93  .k.i = i,
94  .v.p = (void *)val };
95  kmr_add_kv(kvo, nkv);
96  free(val);
97  }
98 
99  /* free all */
100  for (int i = 0; i < file_cnt; i++) {
101  free(files[i]);
102  }
103  free(files);
104 
105  return MPI_SUCCESS;
106 }
107 
108 /* KMR map function
109  It measures times required for reading a file.
110 */
111 static int
112 benchmark(const struct kmr_kv_box kv,
113  const KMR_KVS *kvi, KMR_KVS *kvo, void *p, long i_)
114 {
115  double time_diff = 0.0;
116  char *filename = (char *)kv.v.p;
117  for (char *pp = filename + 1; pp < kv.v.p + kv.vlen; pp++) {
118  if (*pp == '\0') {
119  /* run benchmark */
120  char buf[READBUFSIZ];
121  double t1 = MPI_Wtime();
122  FILE *fp = fopen(filename, "r");
123  assert(fp != NULL);
124  size_t siz;
125  do {
126  siz = fread(buf, sizeof(char), READBUFSIZ, fp);
127  } while (siz != 0);
128  fclose(fp);
129  double t2 = MPI_Wtime();
130  time_diff += t2 - t1;
131 
132  filename = pp + 1;
133  }
134  }
135 
136  struct kmr_kv_box nkv = { .klen = sizeof(long),
137  .vlen = sizeof(double),
138  .k.i = 0,
139  .v.d = time_diff };
140  kmr_add_kv(kvo, nkv);
141  return MPI_SUCCESS;
142 }
143 
144 /* KMR reduce function
145  It calculates the average file read time.
146 */
147 static int
148 summarize(const struct kmr_kv_box kv[], const long n,
149  const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
150 {
151  double sum = 0.0;
152  for (long i = 0; i < n; i++) {
153  printf("%f\n", kv[i].v.d);
154  sum += kv[i].v.d;
155  }
156  double avg = sum / (double)n;
157  printf("Average read time: %f\n", avg);
158  fflush(stdout);
159  return MPI_SUCCESS;
160 }
161 
162 /*
163  It return 1 if the specified file path is a directory.
164  Otherwise it returns 0.
165 */
166 static _Bool
167 check_directory(const char *path)
168 {
169  struct stat s;
170  int ret = stat(path, &s);
171  if (ret != 0) {
172  return 0;
173  }
174  if (S_ISDIR(s.st_mode)) {
175  return 1;
176  }
177  return 0;
178 }
179 
180 static void
181 show_help(int rank)
182 {
183  if (rank == 0) {
184  fprintf(stderr, "Specify a directory.\n\n");
185  fprintf(stderr, "Usage: ./a.out [-s] DIRECTORY\n");
186  fprintf(stderr, " -s : perform shuffle to distribute files.\n");
187  }
188 }
189 
190 
191 int
192 main(int argc, char **argv)
193 {
194  int nprocs, rank;
195  _Bool noiolb = 0;
196  char *dirname = NULL;
197 
198  MPI_Init(&argc, &argv);
199  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
200  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
201 
202  if (!(argc == 2 || argc == 3)) {
203  show_help(rank);
204  MPI_Finalize();
205  return 1;
206  } else if (argc == 2) {
207  dirname = argv[1];
208  if (!check_directory(dirname)) {
209  show_help(rank);
210  MPI_Finalize();
211  return 1;
212  }
213  } else if (argc == 3) {
214  int ret = strcmp(argv[1], "-s");
215  if (ret == 0) {
216  noiolb = 1;
217  }
218  dirname = argv[2];
219  if (!check_directory(dirname)) {
220  show_help(rank);
221  MPI_Finalize();
222  return 1;
223  }
224  }
225 
226  kmr_init();
227  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
228  mr->trace_iolb = 1;
229  mr->verbosity = 5;
230 
231  KMR_KVS *kvs_infiles = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
232  kmr_map_once(kvs_infiles, (void *)dirname, kmr_noopt, 1, read_files);
233 
234  KMR_KVS *kvs_targets = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
235  if (noiolb) {
236  /* perform shuffle */
237  kmr_shuffle(kvs_infiles, kvs_targets, kmr_noopt);
238  } else {
239  /* perform file distribution based on locality */
240  kmr_assign_file(kvs_infiles, kvs_targets, kmr_noopt);
241  }
242 
243  KMR_KVS *kvs_times = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_FLOAT8);
244  kmr_map(kvs_targets, kvs_times, NULL, kmr_noopt, benchmark);
245 
246  KMR_KVS *kvs_all_times = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_FLOAT8);
247  kmr_shuffle(kvs_times, kvs_all_times, kmr_noopt);
248 
249  kmr_reduce(kvs_all_times, NULL, NULL, kmr_noopt, summarize);
250 
251  kmr_free_context(mr);
252  kmr_fin();
253  MPI_Finalize();
254  return 0;
255 }
Key-Value Stream (abstract).
Definition: kmr.h:587
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs.
Definition: kmr.h:88
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
Definition: kmrbase.c:1402
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
KMR Context.
Definition: kmr.h:222
int kmr_assign_file(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Assigns files to ranks based on data locality.
Definition: kmriolb.c:257
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
Definition: kmrbase.c:326
KMR Interface.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147