18 #include <sys/types.h> 23 #if defined(__sparc) && defined(__HPC_ACE__) 37 typedef unsigned char _Bool;
42 #define KMR_API_ID0(X) KMR_API_ID1(X) 43 #define KMR_API_ID1(X) kmr_api_ ## X 44 #define KMR_API_ID KMR_API_ID0(KMR_H) 46 extern int KMR_API_ID;
47 extern const int kmr_version;
51 #define KMR_TAG_SPAWN_REPLY 500 52 #define KMR_TAG_SPAWN_REPLY1 501 60 #define KMR_JOB_NAME_LEN 256 65 #define kmr_create_kvsx(MR,KF,VF,OPT) \ 66 kmr_create_kvs7((MR), (KF), (VF), (OPT), __FILE__, __LINE__, __func__) 71 #define kmr_create_kvs(MR,KF,VF) \ 72 kmr_create_kvs7((MR), (KF), (VF), kmr_noopt, __FILE__, __LINE__, __func__) 76 #define kmr_create_kvs_(MR,IGNORE) \ 77 kmr_create_kvs7((MR), KMR_KV_BAD, KMR_KV_BAD, kmr_noopt, \ 78 __FILE__, __LINE__, __func__) 82 #define kmr_map(KVI, KVO, ARG, OPT, M) \ 83 kmr_map9(0, (KVI), (KVO), (ARG), (OPT), (M), \ 84 __FILE__, __LINE__, __func__) 88 #define kmr_reduce(KVI, KVO, ARG, OPT, R) \ 89 kmr_reduce9(0, (KVI), (KVO), (ARG), (OPT), (R), \ 90 __FILE__, __LINE__, __func__) 229 long ckpt_kvs_id_counter;
238 MPI_Comm **spawn_comms;
240 long mapper_park_size;
241 size_t preset_block_size;
242 size_t malloc_overhead;
248 long sort_sample_factor;
249 int sort_threads_depth;
251 long file_io_block_size;
253 char *kmr_installation_path;
254 char *spawn_watch_program;
255 char *spawn_watch_prefix;
256 char *spawn_watch_host_name;
257 int spawn_max_processes;
259 int spawn_watch_port_range[2];
260 int spawn_gap_msec[2];
261 int spawn_watch_accept_onhold_msec;
263 size_t pushoff_block_size;
264 int pushoff_poll_rate;
269 _Bool single_thread : 1;
270 _Bool one_step_sort : 1;
272 _Bool trace_sorting : 1;
273 _Bool trace_file_io : 1;
274 _Bool trace_map_ms : 1;
275 _Bool trace_map_spawn : 1;
276 _Bool trace_alltoall : 1;
277 _Bool trace_kmrdp : 1;
278 _Bool trace_iolb : 1;
280 _Bool file_io_dummy_striping : 1;
281 _Bool file_io_always_alltoallv : 1;
282 _Bool spawn_sync_at_startup : 1;
283 _Bool spawn_watch_all : 1;
284 _Bool spawn_disconnect_early : 1;
285 _Bool spawn_disconnect_but_free : 1;
286 _Bool spawn_pass_intercomm_in_argument : 1;
287 _Bool keep_fds_at_fork : 1;
289 _Bool mpi_thread_support : 1;
291 _Bool ckpt_enable : 1;
292 _Bool ckpt_selective : 1;
293 _Bool ckpt_no_fsync : 1;
295 _Bool stop_at_some_check_globally : 1;
297 _Bool pushoff_hang_out : 1;
298 _Bool pushoff_fast_notice : 1;
299 _Bool pushoff_stat : 1;
304 } pushoff_statistics;
306 _Bool kmrviz_trace : 1;
308 char identifying_name[KMR_JOB_NAME_LEN];
331 KMR_KV_POINTER_OWNED,
332 KMR_KV_POINTER_UNMANAGED
365 #define kmr_kv_cake kmr_kv_box 395 KMR_KVS_ONCORE_PACKED
399 #define KMR_KVS_MAGIC_OK(X) \ 400 ((X) == KMR_KVS_ONCORE || (X) == KMR_KVS_ONCORE_PACKED \ 401 || (X) == KMR_KVS_PUSHOFF) 459 enum kmr_kvs_magic magic;
471 _Bool shuffled_in_pushoff : 1;
472 _Bool _uniformly_sized_ : 1;
476 long ckpt_generated_op;
477 long ckpt_consumed_op;
480 size_t element_size_limit;
482 size_t storage_netsize;
490 _Bool under_threaded_operation;
493 void *temporary_data;
511 long partial_element_count;
518 static const size_t kmr_kvs_block_header = offsetof(
struct kmr_kvs_block, data);
543 enum kmr_kvs_magic magic;
555 _Bool shuffled_in_pushoff : 1;
556 _Bool _uniformly_sized_ : 1;
563 MPI_Status *statuses;
569 enum kmr_kvs_magic magic;
581 _Bool shuffled_in_pushoff : 1;
582 _Bool _uniformly_sized_ : 1;
614 _Bool nothreading : 1;
617 _Bool key_as_rank : 1;
628 static const struct kmr_option kmr_noopt = {0, 0, 0, 0, 0, 0, 0};
629 static const union {
struct kmr_option o;
unsigned long bits;}
630 kmr_optmask = {{1, 1, 1, 1, 1, 1, 1}};
640 _Bool subdirectories : 1;
642 _Bool shuffle_names : 1;
651 kmr_foptmask = {{1, 1, 1, 1}};
663 _Bool separator_space : 1;
664 _Bool reply_each : 1;
665 _Bool reply_root : 1;
666 _Bool one_by_one : 1;
676 kmr_soptmask = {{1, 1, 1, 1, 1}};
714 union {MPI_Comm icomm;
long i_;} u;
734 unsigned short index;
735 unsigned short len[1];
747 #define kmr_init() kmr_init_2(KMR_API_ID) 749 extern int kmr_init_2(
int ignore);
754 extern KMR *kmr_get_context_of_kvs(
KMR_KVS const *kvs);
759 const char *,
const int,
const char *);
768 void **keyp,
void **valuep);
773 extern int kmr_map9(_Bool stop_when_some_added,
776 const char *,
const int,
const char *);
778 _Bool stop_when_some_added,
817 extern KMR *kmr_create_context_world(
void);
818 extern KMR *kmr_create_dummy_context(
void);
825 #define kmr_sort_a_batch(X0,X1,X2,X3) kmr_sort_locally(X0,X1,X2,X3) 836 const char *,
const int,
const char *);
851 extern int kmr_map_getline_in_memory_(
KMR *mr,
void *b,
size_t sz,
long limit,
904 off_t offset, off_t bytes,
905 void **buffer, off_t *readsize);
907 void **buffer, off_t *readsize);
913 long n, _Bool shuffling, _Bool ranking);
917 char *buf,
int buflen);
920 extern void kmr_dump_opaque(
const char *p,
int siz,
char *buf,
int buflen);
926 const void *v,
const int vlen);
940 int markers[2], _Bool disallow_other_entries);
944 int slots[][2],
int nslots,
945 int keys[][2],
int nkeys);
950 const char*,
const int,
const char*);
951 extern void kmr_print_statistics_on_pushoff(
KMR *mr,
char *titlestring);
953 extern void kmr_fin_pushoff_fast_notice_(
void);
961 kmr_dummy_dummy_dummy_(
void)
964 (void)kmr_kvs_block_header;
int kmr_retrieve_keyed_records(KMR_KVS *kvs, struct kmr_keyed_record *ev, long n, _Bool shuffling, _Bool ranking)
Fills keyed records in an array for sorting.
int kmr_map_serial_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run serial processes.
Key-Value Stream (abstract).
int kmr_map_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps until some key-value are added.
int kmr_map_file_names(KMR *mr, char **names, int n, struct kmr_file_option fopt, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on file names.
Options to Mapping, Shuffling, and Reduction.
int kmr_file_enumerate(KMR *mr, char *names[], int n, KMR_KVS *kvo, struct kmr_file_option fopt)
Adds file names in a key-value stream KVO.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
int kmr_unpairing(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Extracts a key-value pair from a pairing in the value part, discarding the original key...
KMR_KVS * kmr_create_pushoff_kvs(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
void kmr_reset_ntuple(struct kmr_ntuple *u, int n, int marker)
Resets an n-tuple U with N entries and a MARKER.
int kmr_size_ntuple_by_lengths(int n, int len[])
Returns the storage size of an n-tuple for N entries with LEN[i] size each.
int kmr_find_key(KMR_KVS *kvi, struct kmr_kv_box ki, struct kmr_kv_box *vo)
Finds a key-value pair for a key.
int kmr_put_ntuple(KMR *mr, struct kmr_ntuple *u, const int sz, const void *v, const int vlen)
Adds an entry V with LEN in an n-tuple U whose size is limited to SIZE.
int kmr_map_once(KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m)
Maps once.
int kmr_map9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *, const int, const char *)
Maps simply.
int kmr_scan_locally(KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r)
Scans every key-value with a reduce-function locally (independently on each rank).
int kmr_save_kvs(KMR_KVS *kvi, void **dataq, size_t *szq, struct kmr_option opt)
Packs locally the contents of a key-value stream to a byte array.
MPI_Comm * kmr_get_spawner_communicator(KMR *mr, long index)
Obtains (a reference to) a parent inter-communicator of a spawned process.
struct kmr_ntuple_entry kmr_nth_ntuple(struct kmr_ntuple *u, int nth)
Returns an NTH entry of an n-tuple.
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks.
static const size_t kmr_kvs_entry_header
Size of an Entry Header.
int kmr_send_kvs_to_spawner(KMR *mr, KMR_KVS *kvs)
Sends the KVS from a spawned process to the map-function of the spawner.
int kmr_copy_to_array_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Copies the entry in the array.
int kmr_retrieve_kvs_entries(KMR_KVS *kvs, struct kmr_kvs_entry **ev, long n)
Fills local key-value entries in an array for inspection.
int kmr_reduce9(_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *, const int, const char *)
Reduces key-value pairs.
Keyed-Record for Sorting.
int kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
Distributes key-values so that each rank has approximately the same number of pairs.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
int kmr_reply_to_spawner(KMR *mr)
Sends a reply message in the spawned process, which tells it is ready to finish and may have some dat...
int kmr_add_ntuple(KMR_KVS *kvo, void *k, int klen, struct kmr_ntuple *u)
Adds an n-tuple U with a given key K and KLEN in a key-value stream KVO.
int kmr_sort_by_one(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sort by rank0, a degenerated case for small number of keys.
int kmr_add_kv1(KMR_KVS *kvs, void *k, int klen, void *v, int vlen)
Adds a key-value pair as given directly by a pointer.
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one.
int kmr_add_kv_space(KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep)
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value p...
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS).
int kmr_scan_on_values(KMR_KVS *kvi, KMR_KVS *kvo, KMR_KVS *total, kmr_redfn_t r)
Prefix-scans every key-value with a reduce-function (non-self-inclusively) and generates the final va...
int kmr_map_parallel_processes(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent MPI processes, which will not commun...
int kmr_find_string(KMR_KVS *kvi, const char *k, const char **vq)
Finds the key K in the key-value stream KVS.
void kmr_check_pushoff_fast_notice_(KMR *mr)
Check if fast-notice works.
int kmr_assign_file(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Assigns files to ranks based on data locality.
int kmr_copy_kvs_to_info(KMR_KVS *kvi, MPI_Info dst)
Copies kvs entires into mpi-info.
int kmr_map_getline(KMR *mr, FILE *f, long limit, _Bool largebuffering, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Calls a map-function M for each line by getline() on an input F.
int kmr_read_file_by_segments(KMR *mr, char *file, int color, void **buffer, off_t *readsize)
Reads one file by segments and reassembles by all-gather.
kmr_kv_field
Datatypes of Keys or Values.
int kmr_reduce_for_some(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Reduces until some key-value are added.
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
int kmr_pairing(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replaces a value part with a key-value pairing.
Handy Copy of a Key-Value Field.
int kmr_sort_large(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream by the regular or the random sampling-sort.
void kmr_init_pushoff_fast_notice_(MPI_Comm, _Bool verbose)
Initializes RDMA for fast-notice.
int kmr_dump_kv(struct kmr_kv_box kv, const KMR_KVS *kvs, char *buf, int buflen)
Dumps contents of a key-value.
int kmr_dump_kvs(KMR_KVS *kvs, int flag)
Dumps contents of a key-value stream to stdout.
Options to Mapping by Spawns.
int kmr_fin(void)
Clears the environment.
Key-Value Stream with Shuffling at Addition of Key-Values.
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
int kmr_move_kvs(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Moves the contents of the input KVI to the output KVO.
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally.
int kmr_dump_kvs_stats(KMR_KVS *, int level)
Dumps contents of a key-value stream, with values are pairs.
int kmr_size_ntuple(struct kmr_ntuple *u)
Returns the storage size of an n-tuple.
int kmr_separate_ntuples(KMR *mr, const struct kmr_kv_box kv[], const long n, struct kmr_ntuple **vv[2], long cnt[2], int markers[2], _Bool disallow_other_entries)
Separates the n-tuples stored in the value part of KV into the two sets by their marker values...
State during kmr_map_ms().
int kmr_put_ntuple_long(KMR *mr, struct kmr_ntuple *u, const int sz, long v)
Adds an integer value in an n-tuple U whose size is limited to SIZE.
int kmr_match(KMR_KVS *kvi0, KMR_KVS *kvi1, KMR_KVS *kvo, struct kmr_option opt)
Makes key-value pairs as products of the two values in two key-value stream.
int kmr_map_skipping(long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps by skipping the number of entries.
int kmr_product_ntuples(KMR_KVS *kvo, struct kmr_ntuple **vv[2], long cnt[2], int newmarker, int slots[][2], int nslots, int keys[][2], int nkeys)
Makes a direct product of the two sets of n-tuples VV[0] and VV[1] with their counts in CNT[0] and CN...
void kmr_dump_opaque(const char *p, int siz, char *buf, int buflen)
Puts the string of the key or value field into a buffer BUF as printable string.
int kmr_map_ms_commands(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, struct kmr_spawn_option sopt, kmr_mapfn_t m)
Maps in master-slave mode, specialized to run serial commands.
int kmr_shuffle_leveling_pair_count(KMR_KVS *kvi, KMR_KVS *kvo)
Shuffles key-values so that each rank has approximately the same number of pairs. ...
int kmr_read_files_reassemble(KMR *mr, char *file, int color, off_t offset, off_t bytes, void **buffer, off_t *readsize)
Reassembles files reading by ranks.
int kmr_map_via_spawn(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn().
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank.
int kmr_ranking(KMR_KVS *kvi, KMR_KVS *kvo, long *count, struct kmr_option opt)
Assigns a ranking to key-value pairs, and returns the number of the total elements in COUNT...
int kmr_restore_kvs(KMR_KVS *kvo, void *data, size_t sz, struct kmr_option opt)
Unpacks locally the contents of a key-value stream from a byte array.
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context().
int kmr_add_identity_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i)
Adds a given key-value pair unmodified.
int kmr_map_ms(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps in master-slave mode.
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging.
int kmr_legal_minimum_field_size(KMR *mr, enum kmr_kv_field f)
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. ...
Options to Mapping on Files.
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
int kmr_sort_small(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream, by partitioning to equal ranges.
int kmr_add_string(KMR_KVS *kvs, const char *k, const char *v)
Adds a key-value pair of strings.
int kmr_put_ntuple_entry(KMR *mr, struct kmr_ntuple *u, const int sz, struct kmr_ntuple_entry e)
Adds an n-tuple entry E in an n-tuple U whose size is limited to SIZE.
Key-Value Stream (DUMMY); Mandatory Entries.
int kmr_receive_kvs_from_spawned_fn(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Collects key-value pairs generated by spawned processes.
int kmr_sort_locally(KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt)
Reorders key-value pairs in a single rank.
int kmr_reduce_as_one(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r)
Calls a reduce-function once as if all key-value pairs had the same key.
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type.
int kmr_local_element_count(KMR_KVS *kvs, long *v)
Gets the number of key-value pairs locally on each rank.
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field k, enum kmr_kv_field v, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types.
int kmr_reverse(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Makes a new pair by swapping the key and the value in each pair.
Information of Source Code Line.
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type.
int kmr_map_on_rank_zero(KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps on rank0 only.
Record of Push-Off Key-Value Stream for a Rank.
int kmr_copy_info_to_kvs(MPI_Info src, KMR_KVS *kvo)
Copies mpi-info entires into kvs.
int kmr_choose_first_part(KMR_KVS *kvi, KMR_KVS *kvo, long n, struct kmr_option opt)
Chooses the first N entries from a key-value stream KVI.
int kmr_map_processes(_Bool nonmpi, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, MPI_Info info, struct kmr_spawn_option opt, kmr_mapfn_t mapfn)
Maps on processes started by MPI_Comm_spawn() to run independent processes.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).