13 #include "../config.h"    24 #define MAX_IO_GRPS 432      35 kmr_iolb_calc_xy_of_rank(
int rank)
    38     int cc = FJMPI_Topology_sys_rank2xyzabc(rank, &x, &y, &z, &a, &b, &c);
    39     assert(cc == MPI_SUCCESS);
    47 kmr_iolb_load_stripe(
const char *filename,
    48                      struct io_grp *grps, 
int *grps_cnt)
    50     size_t len = strlen(filename);
    51     char *path = (
char *)malloc((len + 1) * 
sizeof(char));
    52     memcpy(path, filename, (len + 1));
    55     for (
char *p = &path[len - 1]; p >= path; p--) {
    73     for (
int i = 0; i < stripe.s.count; i++) {
    74         int x = stripe.obdidx[i] / 2048;
    75         int y = (stripe.obdidx[i] % 2048) / 64;
    77         for (
int j = 0; j < *grps_cnt; j++) {
    78             if (x == grps[j].gid_x && y == grps[j].gid_y) {
    85             grps[*grps_cnt].gid_x = x;
    86             grps[*grps_cnt].gid_y = y;
    87             grps[*grps_cnt].count = 1;
    97 kmr_iolb_shift_grps(
struct io_grp *grps, 
int grps_cnt,
    98                     int *shift_x, 
int *shift_y)
   100     int xs[K_MAX_X] = { 0 };
   101     int ys[K_MAX_Y] = { 0 };
   102     for (
int i = 0; i < grps_cnt; i++) {
   103         xs[grps[i].gid_x] = 1;
   104         ys[grps[i].gid_y] = 1;
   106     if (xs[0] == 1 && xs[K_MAX_X - 1] == 1) {
   107         for (
int i = K_MAX_X - 1; i >= 0; i--) {
   115         for (
int i = 0; i < grps_cnt; i++) {
   116             grps[i].gid_x = (grps[i].gid_x + *shift_x) % K_MAX_X;
   119     if (ys[0] == 1 && ys[K_MAX_Y - 1] == 1) {
   120         for (
int i = K_MAX_Y - 1; i >= 0; i--) {
   128         for (
int i = 0; i < grps_cnt; i++) {
   129             grps[i].gid_y = (grps[i].gid_y + *shift_y) % K_MAX_Y;
   138 kmr_iolb_unshift_xy(
int *x, 
int *y, 
int shift_x, 
int shift_y)
   141         int sub_x = *x - shift_x;
   142         *x = (sub_x < 0)? sub_x + K_MAX_X : sub_x;
   145         int sub_y = *y - shift_y;
   146         *y = (sub_y < 0)? sub_y + K_MAX_Y : sub_y;
   156                               KMR_KVS *kvo, 
void *p, 
long i_)
   158     struct io_grp grps[MAX_IO_GRPS];
   160     char *p1 = (
char *)kv.v.p;
   161     for (
char *p2 = p1; p2 < kv.v.p + kv.vlen; p2++) {
   163             kmr_iolb_load_stripe(p1, grps, &grps_cnt);
   167     int shift_x = 0, shift_y = 0;
   168     kmr_iolb_shift_grps(grps, grps_cnt, &shift_x, &shift_y);
   169     int x_sum = 0, y_sum = 0, cnt_sum = 0;
   170     for (
int i = 0; i < grps_cnt; i++) {
   171         x_sum += grps[i].gid_x * grps[i].count;
   172         y_sum += grps[i].gid_y * grps[i].count;
   173         cnt_sum += grps[i].count;
   175     int x = (int)ceil((
double)x_sum / cnt_sum);
   176     int y = (int)ceil((
double)y_sum / cnt_sum);
   177     kmr_iolb_unshift_xy(&x, &y, shift_x, shift_y);
   179     int file_xy = (x << 16) + y;
   180     struct kmr_kv_box nkv = { .klen = 
sizeof(long),
   185     assert(cc == MPI_SUCCESS);
   187     _Bool tracing5 = (kvi->c.mr->trace_iolb && (5 <= kvi->c.mr->verbosity));
   189         char *filename = (
char *)malloc((
size_t)kv.vlen * 
sizeof(char));
   190         memcpy(filename, kv.v.p, (
size_t)kv.vlen);
   191         for (
int i = 0; i < kv.vlen; i++) {
   192             if (filename[i] == 
'\0') {
   196         filename[kv.vlen - 1] = 
'\0';
   198                 ";;KMR IOLB [%05d]: Group[%d] - File[%s]\n",
   199                 kvi->c.mr->rank, file_xy, filename);
   211                               KMR_KVS *kvo, 
void *p, 
long i_)
   213     int rank = (int)kv.k.i;
   214     char *filename = (
char *)malloc((
size_t)kv.vlen * 
sizeof(
char));
   215     memcpy(filename, kv.v.p, (
size_t)kv.vlen);
   216     for (
int i = 0; i < kv.vlen; i++) {
   217         if (filename[i] == 
'\0') {
   221     filename[kv.vlen - 1] = 
'\0';
   222     fprintf(stderr, 
";;KMR IOLB [%05d]: Rank[%05d] - File[%s]\n",
   223             rank, rank, filename);
   232 kmr_iolb_value_is_equal(
const struct kmr_kv_box kv1,
   235     if (kv1.vlen != kv2.vlen) {
   238     for (
int i = 0; i < kv1.vlen; i++) {
   239         if (kv1.v.p[i] != kv2.v.p[i]) {
   261     struct kmr_option kmr_supported = {.inspect = 1, .take_ckpt = 1};
   262     kmr_check_fn_options(mr, kmr_supported, opt, __func__);
   265             if (!opt.keep_open) {
   275     _Bool tracing5 = (mr->trace_iolb && (5 <= mr->verbosity));
   279     int rank_xy = kmr_iolb_calc_xy_of_rank(mr->rank);
   281     struct kmr_kv_box kv_rnk = { .klen = 
sizeof(long),
   282                                  .vlen = 
sizeof(
long),
   286     assert(cc == MPI_SUCCESS);
   288     assert(cc == MPI_SUCCESS);
   291     assert(cc == MPI_SUCCESS);
   293         fprintf(stderr, 
";;KMR IOLB [%05d]: Group[%d] - Rank[%05d]\n",
   294                 mr->rank, rank_xy, mr->rank);
   301     assert(valf == KMR_KV_OPAQUE || valf == KMR_KV_CSTRING);
   305     assert(cc == MPI_SUCCESS);
   308     cc = 
kmr_map(kvs_each_file, kvs_fileloc_each, NULL, kmr_noopt,
   309                  kmr_iolb_find_file_location_k);
   310     assert(cc == MPI_SUCCESS);
   312     cc = 
kmr_shuffle(kvs_fileloc_each, kvs_fileloc, kmr_noopt);
   313     assert(cc == MPI_SUCCESS);
   317     long nranks, nfilelocs;
   319     assert(cc == MPI_SUCCESS);
   321     assert(cc == MPI_SUCCESS);
   324     if (nranks > 0 && nfilelocs > 0) {
   328         assert(cc == MPI_SUCCESS);
   332         assert(cc == MPI_SUCCESS);
   334         int n = (int)(nfilelocs / nranks);
   335         int r = (int)(nfilelocs % nranks);
   336         int asgn_cnt = (n == 0)? r : (
int)nranks;
   338         for (
int i = 0; i < asgn_cnt; i++) {
   339             long t_rank = kvs_ary1[i].v.i;
   340             int cnt = n + ((i < r)? 1 : 0);
   341             for (
int j = 0; j < cnt; j++) {
   342                 char *t_file = (
char *)kvs_ary2[assigned + j].v.p;
   343                 int t_file_siz = kvs_ary2[assigned + j].vlen;
   344                 struct kmr_kv_box nkv = { .klen = 
sizeof(
long),
   347                                           .v.p  = (
char *)t_file };
   363         assert(nfilelocs <= 0);
   371     assert(cc == MPI_SUCCESS);
   373         cc = 
kmr_map(kvs_myfile, NULL, NULL, inspect,
   374                      kmr_iolb_print_assigned_files);
   380     assert(cc == MPI_SUCCESS);
   383     assert(cc == MPI_SUCCESS);
   387     assert(cc == MPI_SUCCESS);
   389     assert(cc == MPI_SUCCESS);
   393     assert(cc == MPI_SUCCESS);
   394     KMR_OMP_PARALLEL_FOR_
   395         for (
int i = 0; i < nkvo; i++) {
   397             for (
int j = 0; j < nkvi_all; j++) {
   398                 if (kmr_iolb_value_is_equal(kvs_ary1[j], kvs_ary2[i])) {
   408     assert(kvs_myfile->c.element_count == kvo->c.element_count);
   414     if (!opt.keep_open) {
 int kmr_ckpt_progress_init(KMR_KVS *, KMR_KVS *, struct kmr_option)
It initializes a progress of MapReduce checkpointing. 
 
Key-Value Stream (abstract). 
 
Utilities Private Part (do not include from applications). 
 
Lustre Striping Information with OBDIDX. 
 
int kmr_assign_file(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Assigns files to ranks based on data locality. 
 
Options to Mapping, Shuffling, and Reduction. 
 
void kmr_ckpt_progress_fin(KMR *)
It finalizes the progress of MapReduce checkpointing. 
 
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair. 
 
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes. 
 
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks. 
 
int kmr_ckpt_enabled(KMR *)
Check if checkpoint/restart is enabled. 
 
int kmr_copy_to_array_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Copies the entry in the array. 
 
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs. 
 
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS). 
 
void kmr_ckpt_save_kvo_whole(KMR *, KMR_KVS *)
It saves all key-value pairs in the output KVS to a checkpoint data file. 
 
kmr_kv_field
Datatypes of Keys or Values. 
 
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply. 
 
Handy Copy of a Key-Value Field. 
 
int kmr_fefs_get_stripe(const char *dir, const char *file, struct kmr_fefs_stripe *stripe, int *err, _Bool debug_and_dump)
Gets the OBDIDX information on the file or directory. 
 
int kmr_ckpt_disable_ckpt(KMR *)
It temporally disables checkpoint/restart. 
 
Lustre File System (or Fujitsu FEFS) Support. 
 
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
 
int kmr_local_element_count(KMR_KVS *kvs, long *v)
Gets the number of key-value pairs locally on each rank. 
 
int kmr_ckpt_enable_ckpt(KMR *, int)
It temporally enables checkpoint/restart which has been disabled by calling kmr_ckpt_disable_ckpt().