KMR
kmriolb.c
Go to the documentation of this file.
1 /* kmriolb.c (2014-08-08) */
2 /* Copyright (C) 2012-2016 RIKEN AICS */
3 
4 /** \file kmriolb.c load balanced MapReduce based on data locality */
5 
6 #include <mpi.h>
7 #include <assert.h>
8 #include <math.h>
9 #ifdef _OPENMP
10 #include <omp.h>
11 #endif
12 
13 #include "../config.h"
14 #include "kmr.h"
15 #include "kmrimpl.h"
16 #include "kmrfefs.h"
17 
18 #ifdef __K
19 #include <mpi-ext.h>
20 
21 
22 #define K_MAX_X 24
23 #define K_MAX_Y 18
24 #define MAX_IO_GRPS 432 /* = 82944/192, 24x18 */
25 
26 struct io_grp {
27  int gid_x; /* x coordinate */
28  int gid_y; /* y coordinate */
29  int count; /* count of file chunks in this group */
30 };
31 
32 /* Calculates (x,y) coordinates of the specified rank.
33  */
34 static int
35 kmr_iolb_calc_xy_of_rank(int rank)
36 {
37  int x, y, z, a, b, c;
38  int cc = FJMPI_Topology_sys_rank2xyzabc(rank, &x, &y, &z, &a, &b, &c);
39  assert(cc == MPI_SUCCESS);
40  return (x << 16) + y;
41 }
42 
43 /* Loads file stripe information of the specified file.
44  It reads the file chunks' (x,y) coordinates and counts them.
45 */
46 static void
47 kmr_iolb_load_stripe(const char *filename,
48  struct io_grp *grps, int *grps_cnt)
49 {
50  size_t len = strlen(filename);
51  char *path = (char *)malloc((len + 1) * sizeof(char));
52  memcpy(path, filename, (len + 1));
53  char *d = path;
54  char *f = NULL;
55  for (char *p = &path[len - 1]; p >= path; p--) {
56  if (*p == '/') {
57  f = (p + 1);
58  *p = 0;
59  break;
60  }
61  }
62  if (f == NULL) {
63  /* No directory part in file name. */
64  d = ".";
65  f = path;
66  }
67  struct kmr_fefs_stripe stripe;
68  int errori = 0;
69  int cc = kmr_fefs_get_stripe(d, f, &stripe, &errori, 0);
70  assert(cc == 0);
71  free(path);
72 
73  for (int i = 0; i < stripe.s.count; i++) {
74  int x = stripe.obdidx[i] / 2048;
75  int y = (stripe.obdidx[i] % 2048) / 64;
76  _Bool added = 0;
77  for (int j = 0; j < *grps_cnt; j++) {
78  if (x == grps[j].gid_x && y == grps[j].gid_y) {
79  grps[j].count += 1;
80  added = 1;
81  break;
82  }
83  }
84  if (!added) {
85  grps[*grps_cnt].gid_x = x;
86  grps[*grps_cnt].gid_y = y;
87  grps[*grps_cnt].count = 1;
88  *grps_cnt += 1;
89  }
90  }
91 }
92 
93 /* Shifts (x,y) coordinates of IO groups so that the coordinates will be
94  contiguous.
95 */
96 static void
97 kmr_iolb_shift_grps(struct io_grp *grps, int grps_cnt,
98  int *shift_x, int *shift_y)
99 {
100  int xs[K_MAX_X] = { 0 };
101  int ys[K_MAX_Y] = { 0 };
102  for (int i = 0; i < grps_cnt; i++) {
103  xs[grps[i].gid_x] = 1;
104  ys[grps[i].gid_y] = 1;
105  }
106  if (xs[0] == 1 && xs[K_MAX_X - 1] == 1) {
107  for (int i = K_MAX_X - 1; i >= 0; i--) {
108  if (xs[i] == 1) {
109  *shift_x += 1;
110  }
111  if (xs[i] == 0) {
112  break;
113  }
114  }
115  for (int i = 0; i < grps_cnt; i++) {
116  grps[i].gid_x = (grps[i].gid_x + *shift_x) % K_MAX_X;
117  }
118  }
119  if (ys[0] == 1 && ys[K_MAX_Y - 1] == 1) {
120  for (int i = K_MAX_Y - 1; i >= 0; i--) {
121  if (ys[i] == 1) {
122  *shift_y += 1;
123  }
124  if (ys[i] == 0) {
125  break;
126  }
127  }
128  for (int i = 0; i < grps_cnt; i++) {
129  grps[i].gid_y = (grps[i].gid_y + *shift_y) % K_MAX_Y;
130  }
131  }
132 }
133 
134 /* Unshifts (x,y) coordinates of IO groups shifted by kmr_iolb_shift_grps()
135  to restore the original coordinates.
136 */
137 static void
138 kmr_iolb_unshift_xy(int *x, int *y, int shift_x, int shift_y)
139 {
140  if (shift_x > 0) {
141  int sub_x = *x - shift_x;
142  *x = (sub_x < 0)? sub_x + K_MAX_X : sub_x;
143  }
144  if (shift_y > 0) {
145  int sub_y = *y - shift_y;
146  *y = (sub_y < 0)? sub_y + K_MAX_Y : sub_y;
147  }
148 }
149 
150 /* Finds location of files specified by a value of a key-value pair.
151  It generates a key-value pair whose key is (x,y) coordinates and value is
152  filename(s).
153 */
154 static int
155 kmr_iolb_find_file_location_k(const struct kmr_kv_box kv, const KMR_KVS *kvi,
156  KMR_KVS *kvo, void *p, long i_)
157 {
158  struct io_grp grps[MAX_IO_GRPS];
159  int grps_cnt = 0;
160  char *p1 = (char *)kv.v.p;
161  for (char *p2 = p1; p2 < kv.v.p + kv.vlen; p2++) {
162  if (*p2 == '\0') {
163  kmr_iolb_load_stripe(p1, grps, &grps_cnt);
164  p1 = p2 + 1;
165  }
166  }
167  int shift_x = 0, shift_y = 0;
168  kmr_iolb_shift_grps(grps, grps_cnt, &shift_x, &shift_y);
169  int x_sum = 0, y_sum = 0, cnt_sum = 0;
170  for (int i = 0; i < grps_cnt; i++) {
171  x_sum += grps[i].gid_x * grps[i].count;
172  y_sum += grps[i].gid_y * grps[i].count;
173  cnt_sum += grps[i].count;
174  }
175  int x = (int)ceil((double)x_sum / cnt_sum);
176  int y = (int)ceil((double)y_sum / cnt_sum);
177  kmr_iolb_unshift_xy(&x, &y, shift_x, shift_y);
178 
179  int file_xy = (x << 16) + y;
180  struct kmr_kv_box nkv = { .klen = sizeof(long),
181  .vlen = kv.vlen,
182  .k.i = file_xy,
183  .v.p = kv.v.p };
184  int cc = kmr_add_kv(kvo, nkv);
185  assert(cc == MPI_SUCCESS);
186 
187  _Bool tracing5 = (kvi->c.mr->trace_iolb && (5 <= kvi->c.mr->verbosity));
188  if (tracing5) {
189  char *filename = (char *)malloc((size_t)kv.vlen * sizeof(char));
190  memcpy(filename, kv.v.p, (size_t)kv.vlen);
191  for (int i = 0; i < kv.vlen; i++) {
192  if (filename[i] == '\0') {
193  filename[i] = ' ';
194  }
195  }
196  filename[kv.vlen - 1] = '\0';
197  fprintf(stderr,
198  ";;KMR IOLB [%05d]: Group[%d] - File[%s]\n",
199  kvi->c.mr->rank, file_xy, filename);
200  fflush(stderr);
201  free(filename);
202  }
203 
204  return MPI_SUCCESS;
205 }
206 
207 /* Prints assigned rank and files pairs.
208  */
209 static int
210 kmr_iolb_print_assigned_files(const struct kmr_kv_box kv, const KMR_KVS *kvi,
211  KMR_KVS *kvo, void *p, long i_)
212 {
213  int rank = (int)kv.k.i;
214  char *filename = (char *)malloc((size_t)kv.vlen * sizeof(char));
215  memcpy(filename, kv.v.p, (size_t)kv.vlen);
216  for (int i = 0; i < kv.vlen; i++) {
217  if (filename[i] == '\0') {
218  filename[i] = ' ';
219  }
220  }
221  filename[kv.vlen - 1] = '\0';
222  fprintf(stderr, ";;KMR IOLB [%05d]: Rank[%05d] - File[%s]\n",
223  rank, rank, filename);
224  fflush(stderr);
225  free(filename);
226  return MPI_SUCCESS;
227 }
228 
229 /* Checks if values of the specified two key-value pair is same.
230  */
231 static _Bool
232 kmr_iolb_value_is_equal(const struct kmr_kv_box kv1,
233  const struct kmr_kv_box kv2)
234 {
235  if (kv1.vlen != kv2.vlen) {
236  return 0;
237  }
238  for (int i = 0; i < kv1.vlen; i++) {
239  if (kv1.v.p[i] != kv2.v.p[i]) {
240  return 0;
241  }
242  }
243  return 1;
244 }
245 #endif /*__K*/
246 
247 /** Assigns files to ranks based on data locality. It assumes that values
248  of key-value pairs in the input KVS are file paths and it shuffles the
249  key-value pairs and writes results to the output KVS so that the files
250  are assigned to near ranks. If the value of a key-value pair is file
251  paths separated by '\0', it will find a rank near from all the files
252  specified in the value. Currently, it only works on the K computer.
253  On the other systems, it just performs kmr_shuffle().
254  Effective-options: INSPECT, TAKE_CKPT. See struct kmr_option. */
255 
256 int
257 kmr_assign_file(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
258 {
259 #ifdef __K
260  KMR *mr = kvi->c.mr;
261  struct kmr_option kmr_supported = {.inspect = 1, .take_ckpt = 1};
262  kmr_check_fn_options(mr, kmr_supported, opt, __func__);
263  if (kmr_ckpt_enabled(mr)) {
264  if (kmr_ckpt_progress_init(kvi, kvo, opt)) {
265  if (!opt.keep_open) {
266  kmr_add_kv_done(kvo);
267  }
268  if (!opt.inspect) {
269  kmr_free_kvs(kvi);
270  }
271  return MPI_SUCCESS;
272  }
273  }
274  int kcdc = kmr_ckpt_disable_ckpt(mr);
275  _Bool tracing5 = (mr->trace_iolb && (5 <= mr->verbosity));
276  /*----------------------------------------------------------*/
277 
278  /* Create <iog_id,rank> kvs and shuffle it */
279  int rank_xy = kmr_iolb_calc_xy_of_rank(mr->rank);
280  KMR_KVS *kvs_myrank = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
281  struct kmr_kv_box kv_rnk = { .klen = sizeof(long),
282  .vlen = sizeof(long),
283  .k.i = rank_xy,
284  .v.i = mr->rank };
285  int cc = kmr_add_kv(kvs_myrank, kv_rnk);
286  assert(cc == MPI_SUCCESS);
287  cc = kmr_add_kv_done(kvs_myrank);
288  assert(cc == MPI_SUCCESS);
289  KMR_KVS *kvs_rank = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
290  cc = kmr_shuffle(kvs_myrank, kvs_rank, kmr_noopt);
291  assert(cc == MPI_SUCCESS);
292  if (tracing5) {
293  fprintf(stderr, ";;KMR IOLB [%05d]: Group[%d] - Rank[%05d]\n",
294  mr->rank, rank_xy, mr->rank);
295  fflush(stderr);
296  }
297 
298  /* Create <iog_id,file_name> kvs and shuffle it */
299  enum kmr_kv_field keyf = kvi->c.key_data;
300  enum kmr_kv_field valf = kvi->c.value_data;
301  assert(valf == KMR_KV_OPAQUE || valf == KMR_KV_CSTRING);
302  KMR_KVS *kvs_each_file = kmr_create_kvs(mr, keyf, valf);
303  struct kmr_option inspect = {.inspect = 1};
304  cc = kmr_shuffle(kvi, kvs_each_file, inspect);
305  assert(cc == MPI_SUCCESS);
306  KMR_KVS *kvs_fileloc_each = kmr_create_kvs(mr, KMR_KV_INTEGER,
307  KMR_KV_OPAQUE);
308  cc = kmr_map(kvs_each_file, kvs_fileloc_each, NULL, kmr_noopt,
309  kmr_iolb_find_file_location_k);
310  assert(cc == MPI_SUCCESS);
311  KMR_KVS *kvs_fileloc = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
312  cc = kmr_shuffle(kvs_fileloc_each, kvs_fileloc, kmr_noopt);
313  assert(cc == MPI_SUCCESS);
314 
315  /* Merge <iog_id,rank> kvs and <iog_id,file_name> kvs to
316  create <rank, file_name> kvs */
317  long nranks, nfilelocs;
318  cc = kmr_local_element_count(kvs_rank, &nranks);
319  assert(cc == MPI_SUCCESS);
320  cc = kmr_local_element_count(kvs_fileloc, &nfilelocs);
321  assert(cc == MPI_SUCCESS);
322 
323  KMR_KVS *kvs_map = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
324  if (nranks > 0 && nfilelocs > 0) {
325  struct kmr_kv_box *kvs_ary1 =
326  (struct kmr_kv_box *)malloc((size_t)nranks * sizeof(struct kmr_kv_box));
327  cc = kmr_map(kvs_rank, NULL, kvs_ary1, inspect, kmr_copy_to_array_fn);
328  assert(cc == MPI_SUCCESS);
329  struct kmr_kv_box *kvs_ary2 =
330  (struct kmr_kv_box *)malloc((size_t)nfilelocs * sizeof(struct kmr_kv_box));
331  cc = kmr_map(kvs_fileloc, NULL, kvs_ary2, inspect, kmr_copy_to_array_fn);
332  assert(cc == MPI_SUCCESS);
333 
334  int n = (int)(nfilelocs / nranks);
335  int r = (int)(nfilelocs % nranks);
336  int asgn_cnt = (n == 0)? r : (int)nranks;
337  int assigned = 0;
338  for (int i = 0; i < asgn_cnt; i++) {
339  long t_rank = kvs_ary1[i].v.i;
340  int cnt = n + ((i < r)? 1 : 0);
341  for (int j = 0; j < cnt; j++) {
342  char *t_file = (char *)kvs_ary2[assigned + j].v.p;
343  int t_file_siz = kvs_ary2[assigned + j].vlen;
344  struct kmr_kv_box nkv = { .klen = sizeof(long),
345  .vlen = t_file_siz,
346  .k.i = t_rank,
347  .v.p = (char *)t_file };
348  kmr_add_kv(kvs_map, nkv);
349  }
350  assigned += cnt;
351  }
352  free(kvs_ary1);
353  free(kvs_ary2);
354  } else {
355  /* TODO
356  nranks > 0 && nfilelocs == 0 :
357  no need to do, or read files from other groups
358  nranks == 0 && nfilelocs > 0 :
359  read files from other groups
360  nranks == 0 && nfilelocs == 0 :
361  no need to do
362  */
363  assert(nfilelocs <= 0);
364  }
365  kmr_add_kv_done(kvs_map);
366  kmr_free_kvs(kvs_rank);
367  kmr_free_kvs(kvs_fileloc);
368 
369  KMR_KVS *kvs_myfile = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_OPAQUE);
370  cc = kmr_shuffle(kvs_map, kvs_myfile, kmr_noopt);
371  assert(cc == MPI_SUCCESS);
372  if (tracing5) {
373  cc = kmr_map(kvs_myfile, NULL, NULL, inspect,
374  kmr_iolb_print_assigned_files);
375  }
376 
377  /* Create <key_in_kvi,assigned_file_name> kvs and save it as kvo> */
378  KMR_KVS *kvi_all = kmr_create_kvs(mr, keyf, valf);
379  cc = kmr_replicate(kvi, kvi_all, inspect);
380  assert(cc == MPI_SUCCESS);
381  long nkvi_all, nkvo;
382  cc = kmr_local_element_count(kvi_all, &nkvi_all);
383  assert(cc == MPI_SUCCESS);
384  struct kmr_kv_box *kvs_ary1 =
385  (struct kmr_kv_box *)malloc((size_t)nkvi_all * sizeof(struct kmr_kv_box));
386  cc = kmr_map(kvi_all, NULL, kvs_ary1, inspect, kmr_copy_to_array_fn);
387  assert(cc == MPI_SUCCESS);
388  cc = kmr_local_element_count(kvs_myfile, &nkvo);
389  assert(cc == MPI_SUCCESS);
390  struct kmr_kv_box *kvs_ary2 =
391  (struct kmr_kv_box *)malloc((size_t)nkvo * sizeof(struct kmr_kv_box));
392  cc = kmr_map(kvs_myfile, NULL, kvs_ary2, inspect, kmr_copy_to_array_fn);
393  assert(cc == MPI_SUCCESS);
394  KMR_OMP_PARALLEL_FOR_
395  for (int i = 0; i < nkvo; i++) {
396  _Bool kv_added = 0;
397  for (int j = 0; j < nkvi_all; j++) {
398  if (kmr_iolb_value_is_equal(kvs_ary1[j], kvs_ary2[i])) {
399  kmr_add_kv(kvo, kvs_ary1[j]);
400  kv_added = 1;
401  break;
402  }
403  }
404  assert(kv_added);
405  }
406  free(kvs_ary1);
407  free(kvs_ary2);
408  assert(kvs_myfile->c.element_count == kvo->c.element_count);
409  kmr_free_kvs(kvs_myfile);
410  kmr_free_kvs(kvi_all);
411 
412  /*----------------------------------------------------------*/
413  kmr_ckpt_enable_ckpt(mr, kcdc);
414  if (!opt.keep_open) {
415  kmr_add_kv_done(kvo);
416  }
417  if (kmr_ckpt_enabled(mr)) {
418  kmr_ckpt_save_kvo_whole(mr, kvo);
419  }
420  if (!opt.inspect) {
421  kmr_free_kvs(kvi);
422  }
423  if (kmr_ckpt_enabled(mr)) {
425  }
426  return MPI_SUCCESS;
427 #else
428  return kmr_shuffle(kvi, kvo, opt);
429 #endif /*__K*/
430 }
431 
432 /*
433 Copyright (C) 2012-2016 RIKEN AICS
434 This library is distributed WITHOUT ANY WARRANTY. This library can be
435 redistributed and/or modified under the terms of the BSD 2-Clause License.
436 */
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Definition: kmrckpt.c:2753
Key-Value Stream (abstract).
Definition: kmr.h:587
Utilities Private Part (do not include from applications).
Lustre Striping Information with OBDIDX.
Definition: kmrfefs.h:20
int kmr_assign_file(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Assigns files to ranks based on data locality.
Definition: kmriolb.c:257
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
Definition: kmrckpt.c:2845
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
Definition: kmrbase.c:2036
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
Definition: kmrckpt.c:2478
int kmr_copy_to_array_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Copies the entry in the array.
Definition: kmrutil.c:934
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
KMR Context.
Definition: kmr.h:222
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
Definition: kmrbase.c:621
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file.
Definition: kmrckpt.c:2638
kmr_kv_field
Datatypes of Keys or Values.
Definition: kmr.h:325
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_fefs_get_stripe(const char *dir, const char *file, struct kmr_fefs_stripe *stripe, int *err, _Bool debug_and_dump)
Gets the OBDIDX information on the file or directory.
Definition: kmrfefs.c:82
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
Definition: kmrckpt.c:2494
Lustre File System (or Fujitsu FEFS) Support.
KMR Interface.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
Definition: kmrbase.c:2182
Definition: kmriolb.c:26
int kmr_local_element_count(KMR_KVS *kvs, long *v)
Gets the number of key-value pairs locally on each rank.
Definition: kmrutil.c:349
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().
Definition: kmrckpt.c:2515