13 #include "../config.h" 24 #define MAX_IO_GRPS 432 35 kmr_iolb_calc_xy_of_rank(
int rank)
38 int cc = FJMPI_Topology_sys_rank2xyzabc(rank, &x, &y, &z, &a, &b, &c);
39 assert(cc == MPI_SUCCESS);
47 kmr_iolb_load_stripe(
const char *filename,
48 struct io_grp *grps,
int *grps_cnt)
50 size_t len = strlen(filename);
51 char *path = (
char *)malloc((len + 1) *
sizeof(char));
52 memcpy(path, filename, (len + 1));
55 for (
char *p = &path[len - 1]; p >= path; p--) {
73 for (
int i = 0; i < stripe.s.count; i++) {
74 int x = stripe.obdidx[i] / 2048;
75 int y = (stripe.obdidx[i] % 2048) / 64;
77 for (
int j = 0; j < *grps_cnt; j++) {
78 if (x == grps[j].gid_x && y == grps[j].gid_y) {
85 grps[*grps_cnt].gid_x = x;
86 grps[*grps_cnt].gid_y = y;
87 grps[*grps_cnt].count = 1;
97 kmr_iolb_shift_grps(
struct io_grp *grps,
int grps_cnt,
98 int *shift_x,
int *shift_y)
100 int xs[K_MAX_X] = { 0 };
101 int ys[K_MAX_Y] = { 0 };
102 for (
int i = 0; i < grps_cnt; i++) {
103 xs[grps[i].gid_x] = 1;
104 ys[grps[i].gid_y] = 1;
106 if (xs[0] == 1 && xs[K_MAX_X - 1] == 1) {
107 for (
int i = K_MAX_X - 1; i >= 0; i--) {
115 for (
int i = 0; i < grps_cnt; i++) {
116 grps[i].gid_x = (grps[i].gid_x + *shift_x) % K_MAX_X;
119 if (ys[0] == 1 && ys[K_MAX_Y - 1] == 1) {
120 for (
int i = K_MAX_Y - 1; i >= 0; i--) {
128 for (
int i = 0; i < grps_cnt; i++) {
129 grps[i].gid_y = (grps[i].gid_y + *shift_y) % K_MAX_Y;
138 kmr_iolb_unshift_xy(
int *x,
int *y,
int shift_x,
int shift_y)
141 int sub_x = *x - shift_x;
142 *x = (sub_x < 0)? sub_x + K_MAX_X : sub_x;
145 int sub_y = *y - shift_y;
146 *y = (sub_y < 0)? sub_y + K_MAX_Y : sub_y;
156 KMR_KVS *kvo,
void *p,
long i_)
158 struct io_grp grps[MAX_IO_GRPS];
160 char *p1 = (
char *)kv.v.p;
161 for (
char *p2 = p1; p2 < kv.v.p + kv.vlen; p2++) {
163 kmr_iolb_load_stripe(p1, grps, &grps_cnt);
167 int shift_x = 0, shift_y = 0;
168 kmr_iolb_shift_grps(grps, grps_cnt, &shift_x, &shift_y);
169 int x_sum = 0, y_sum = 0, cnt_sum = 0;
170 for (
int i = 0; i < grps_cnt; i++) {
171 x_sum += grps[i].gid_x * grps[i].count;
172 y_sum += grps[i].gid_y * grps[i].count;
173 cnt_sum += grps[i].count;
175 int x = (int)ceil((
double)x_sum / cnt_sum);
176 int y = (int)ceil((
double)y_sum / cnt_sum);
177 kmr_iolb_unshift_xy(&x, &y, shift_x, shift_y);
179 int file_xy = (x << 16) + y;
180 struct kmr_kv_box nkv = { .klen =
sizeof(long),
185 assert(cc == MPI_SUCCESS);
187 _Bool tracing5 = (kvi->c.mr->trace_iolb && (5 <= kvi->c.mr->verbosity));
189 char *filename = (
char *)malloc((
size_t)kv.vlen *
sizeof(char));
190 memcpy(filename, kv.v.p, (
size_t)kv.vlen);
191 for (
int i = 0; i < kv.vlen; i++) {
192 if (filename[i] ==
'\0') {
196 filename[kv.vlen - 1] =
'\0';
198 ";;KMR IOLB [%05d]: Group[%d] - File[%s]\n",
199 kvi->c.mr->rank, file_xy, filename);
211 KMR_KVS *kvo,
void *p,
long i_)
213 int rank = (int)kv.k.i;
214 char *filename = (
char *)malloc((
size_t)kv.vlen *
sizeof(
char));
215 memcpy(filename, kv.v.p, (
size_t)kv.vlen);
216 for (
int i = 0; i < kv.vlen; i++) {
217 if (filename[i] ==
'\0') {
221 filename[kv.vlen - 1] =
'\0';
222 fprintf(stderr,
";;KMR IOLB [%05d]: Rank[%05d] - File[%s]\n",
223 rank, rank, filename);
232 kmr_iolb_value_is_equal(
const struct kmr_kv_box kv1,
235 if (kv1.vlen != kv2.vlen) {
238 for (
int i = 0; i < kv1.vlen; i++) {
239 if (kv1.v.p[i] != kv2.v.p[i]) {
261 struct kmr_option kmr_supported = {.inspect = 1, .take_ckpt = 1};
262 kmr_check_fn_options(mr, kmr_supported, opt, __func__);
265 if (!opt.keep_open) {
275 _Bool tracing5 = (mr->trace_iolb && (5 <= mr->verbosity));
279 int rank_xy = kmr_iolb_calc_xy_of_rank(mr->rank);
281 struct kmr_kv_box kv_rnk = { .klen =
sizeof(long),
282 .vlen =
sizeof(
long),
286 assert(cc == MPI_SUCCESS);
288 assert(cc == MPI_SUCCESS);
291 assert(cc == MPI_SUCCESS);
293 fprintf(stderr,
";;KMR IOLB [%05d]: Group[%d] - Rank[%05d]\n",
294 mr->rank, rank_xy, mr->rank);
301 assert(valf == KMR_KV_OPAQUE || valf == KMR_KV_CSTRING);
305 assert(cc == MPI_SUCCESS);
308 cc =
kmr_map(kvs_each_file, kvs_fileloc_each, NULL, kmr_noopt,
309 kmr_iolb_find_file_location_k);
310 assert(cc == MPI_SUCCESS);
312 cc =
kmr_shuffle(kvs_fileloc_each, kvs_fileloc, kmr_noopt);
313 assert(cc == MPI_SUCCESS);
317 long nranks, nfilelocs;
319 assert(cc == MPI_SUCCESS);
321 assert(cc == MPI_SUCCESS);
324 if (nranks > 0 && nfilelocs > 0) {
328 assert(cc == MPI_SUCCESS);
332 assert(cc == MPI_SUCCESS);
334 int n = (int)(nfilelocs / nranks);
335 int r = (int)(nfilelocs % nranks);
336 int asgn_cnt = (n == 0)? r : (
int)nranks;
338 for (
int i = 0; i < asgn_cnt; i++) {
339 long t_rank = kvs_ary1[i].v.i;
340 int cnt = n + ((i < r)? 1 : 0);
341 for (
int j = 0; j < cnt; j++) {
342 char *t_file = (
char *)kvs_ary2[assigned + j].v.p;
343 int t_file_siz = kvs_ary2[assigned + j].vlen;
344 struct kmr_kv_box nkv = { .klen =
sizeof(
long),
347 .v.p = (
char *)t_file };
363 assert(nfilelocs <= 0);
371 assert(cc == MPI_SUCCESS);
373 cc =
kmr_map(kvs_myfile, NULL, NULL, inspect,
374 kmr_iolb_print_assigned_files);
380 assert(cc == MPI_SUCCESS);
383 assert(cc == MPI_SUCCESS);
387 assert(cc == MPI_SUCCESS);
389 assert(cc == MPI_SUCCESS);
393 assert(cc == MPI_SUCCESS);
394 KMR_OMP_PARALLEL_FOR_
395 for (
int i = 0; i < nkvo; i++) {
397 for (
int j = 0; j < nkvi_all; j++) {
398 if (kmr_iolb_value_is_equal(kvs_ary1[j], kvs_ary2[i])) {
408 assert(kvs_myfile->c.element_count == kvo->c.element_count);
414 if (!opt.keep_open) {
int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing.
Key-Value Stream (abstract).
Utilities Private Part (do not include from applications).
Lustre Striping Information with OBDIDX.
int kmr_assign_file(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Assigns files to ranks based on data locality.
Options to Mapping, Shuffling, and Reduction.
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled.
int kmr_copy_to_array_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Copies the entry in the array.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file.
kmr_kv_field
Datatypes of Keys or Values.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_fefs_get_stripe(const char *dir, const char *file, struct kmr_fefs_stripe *stripe, int *err, _Bool debug_and_dump)
Gets the OBDIDX information on the file or directory.
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart.
Lustre File System (or Fujitsu FEFS) Support.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_local_element_count(KMR_KVS *kvs, long *v)
Gets the number of key-value pairs locally on each rank.
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().