KMR
|
KMR Base Implementation (on-memory operations). More...
#include <mpi.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <limits.h>
#include <errno.h>
#include <assert.h>
#include <ctype.h>
#include "../config.h"
#include "kmr.h"
#include "kmrimpl.h"
#include "kmrtrace.h"
Go to the source code of this file.
Functions | |
int | kmr_add_identity_fn (const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long i) |
Adds a given key-value pair unmodified. More... | |
int | kmr_add_kv (KMR_KVS *kvs, const struct kmr_kv_box kv) |
Adds a key-value pair. More... | |
int | kmr_add_kv1 (KMR_KVS *kvs, void *k, int klen, void *v, int vlen) |
Adds a key-value pair as given directly by a pointer. More... | |
int | kmr_add_kv_done (KMR_KVS *kvs) |
Marks finished adding key-value pairs. More... | |
static int | kmr_add_kv_nomutex (KMR_KVS *kvs, const struct kmr_kv_box kv, struct kmr_kv_box *xkv, _Bool reserve_space_only) |
int | kmr_add_kv_quick_ (KMR_KVS *kvs, const struct kmr_kv_box kv) |
int | kmr_add_kv_space (KMR_KVS *kvs, const struct kmr_kv_box kv, void **keyp, void **valuep) |
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value parts. More... | |
int | kmr_add_string (KMR_KVS *kvs, const char *k, const char *v) |
Adds a key-value pair of strings. More... | |
int | kmr_allocate_block (KMR_KVS *kvs, size_t size) |
static void | kmr_assert_on_tail_marker (KMR_KVS *kvs) |
static kmr_record_sorter_t | kmr_choose_record_sorter (const KMR_KVS *kvs) |
kmr_sorter_t | kmr_choose_sorter (const KMR_KVS *kvs) |
static int | kmr_collapse_as_opaque (KMR_KVS *kvi, KMR_KVS *kvo, _Bool inspectp) |
static int | kmr_compare_float8 (const struct kmr_kv_box *p0, const struct kmr_kv_box *p1) |
static int | kmr_compare_integer (const struct kmr_kv_box *p0, const struct kmr_kv_box *p1) |
static int | kmr_compare_lexicographically (const unsigned char *p, const int plen, const unsigned char *q, const int qlen) |
static int | kmr_compare_opaque (const struct kmr_kv_box *p, const struct kmr_kv_box *q) |
static int | kmr_compare_record_float8_ (const struct kmr_keyed_record *p0, const struct kmr_keyed_record *p1) |
static int | kmr_compare_record_integer_ (const struct kmr_keyed_record *p0, const struct kmr_keyed_record *p1) |
static int | kmr_compare_record_opaque (const struct kmr_keyed_record *p0, const struct kmr_keyed_record *p1) |
int | kmr_concatenate_kvs (KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt) |
Concatenates a number of KVSes to one. More... | |
static int | kmr_copy_record_fn (const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *p, const long i) |
static long | kmr_count_entries (KMR_KVS *kvs, _Bool bound_in_block) |
KMR * | kmr_create_context (const MPI_Comm comm, const MPI_Info conf, const char *identifying_name) |
Makes a new KMR context (a context has type KMR). More... | |
KMR * | kmr_create_context_world () |
KMR_KVS * | kmr_create_kvs7 (KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *file, const int line, const char *func) |
Makes a new key-value stream with the specified field data-types. More... | |
static KMR_KVS * | kmr_create_raw_kvs (KMR *mr, const KMR_KVS *_similar) |
Makes a new key-value stream (type KMR_KVS). More... | |
int | kmr_fin (void) |
Clears the environment. More... | |
struct kmr_kvs_entry * | kmr_find_kvs_last_entry (KMR_KVS *kvs) |
Finds the last entry of a key-value stream. More... | |
int | kmr_free_context (KMR *mr) |
Releases a context created with kmr_create_context(). More... | |
int | kmr_free_kvs (KMR_KVS *kvs) |
Releases a key-value stream (type KMR_KVS). More... | |
static int | kmr_free_kvs_oncore (KMR_KVS *kvs) |
KMR * | kmr_get_context_of_kvs (KMR_KVS const *kvs) |
static signed long | kmr_hash_key (const KMR_KVS *kvs, const struct kmr_kv_box kv) |
static unsigned long | kmr_hash_key_opaque (const unsigned char *p, int n) |
static int | kmr_icmp (const void *a0, const void *a1) |
Compares the key field of keyed-records for qsort/bsearch. More... | |
int | kmr_init_2 (int ignore) |
void | kmr_init_kvs_oncore (KMR_KVS *kvs, KMR *mr) |
static void | kmr_kvs_adjust_adding_point (KMR_KVS *kvs) |
int | kmr_legal_minimum_field_size (KMR *mr, enum kmr_kv_field f) |
Returns a minimum byte size of the field: 8 for INTEGER and FLOAT8, 0 for others. More... | |
int | kmr_map9 (_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m, const char *file, const int line, const char *func) |
Maps simply. More... | |
int | kmr_map_on_rank_zero (KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m) |
Maps on rank0 only. More... | |
int | kmr_map_once (KMR_KVS *kvo, void *arg, struct kmr_option opt, _Bool rank_zero_only, kmr_mapfn_t m) |
Maps once. More... | |
static int | kmr_map_parked (struct kmr_kv_box *ev, long evcnt, long mapcount, _Bool k_reclaim, _Bool v_reclaim, KMR_KVS *kvi, KMR_KVS *kvo, kmr_mapfn_t m, void *arg, struct kmr_option opt) |
int | kmr_map_rank_by_rank (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m) |
Maps sequentially with rank by rank for debugging. More... | |
int | kmr_map_skipping (long from, long stride, long limit, _Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m) |
Maps by skipping the number of entries. More... | |
int | kmr_move_kvs (KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt) |
Moves the contents of the input KVI to the output KVO. More... | |
int | kmr_pitch_rank (const struct kmr_kv_box kv, KMR_KVS *kvs) |
int | kmr_reduce9 (_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r, const char *file, const int line, const char *func) |
Reduces key-value pairs. More... | |
int | kmr_reduce_as_one (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r) |
Calls a reduce-function once as if all key-value pairs had the same key. More... | |
static int | kmr_reduce_nothreading (KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r) |
static int | kmr_reduce_threading (_Bool stop_when_some_added, KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_redfn_t r) |
int | kmr_replicate (KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt) |
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gather. More... | |
int | kmr_restore_kvs (KMR_KVS *kvo, void *data, size_t sz_, struct kmr_option opt) |
Unpacks locally the contents of a key-value stream from a byte array. More... | |
int | kmr_retrieve_keyed_records (KMR_KVS *kvs, struct kmr_keyed_record *ev, long n, _Bool shuffling, _Bool ranking) |
Fills keyed records in an array for sorting. More... | |
int | kmr_retrieve_kvs_entries (KMR_KVS *kvs, struct kmr_kvs_entry **ev, long n) |
Fills local key-value entries in an array for inspection. More... | |
int | kmr_save_kvs (KMR_KVS *kvs, void **dataq, size_t *szq, struct kmr_option opt) |
Packs locally the contents of a key-value stream to a byte array. More... | |
int | kmr_scan_locally (KMR_KVS *kvi, KMR_KVS *carryin, KMR_KVS *kvo, KMR_KVS *carryout, kmr_redfn_t r) |
Scans every key-value with a reduce-function locally (independently on each rank). More... | |
int | kmr_shuffle (KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt) |
Shuffles key-value pairs to the appropriate destination ranks. More... | |
int | kmr_sort_locally (KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, struct kmr_option opt) |
Reorders key-value pairs in a single rank. More... | |
static int | kmr_sort_locally_lo (KMR_KVS *kvi, KMR_KVS *kvo, _Bool shuffling, _Bool ranking, struct kmr_option opt) |
signed long | kmr_stable_key (const struct kmr_kv_box kv, const KMR_KVS *kvs) |
static long | kmr_stable_key_opaque (const struct kmr_kv_box kv) |
int | kmr_take_one (KMR_KVS *kvi, struct kmr_kv_box *kv) |
Extracts a single key-value pair locally in the key-value stream KVI. More... | |
static void | kmr_unlink_kvs (KMR_KVS *kvs) |
Variables | |
int | KMR_API_ID = 0 |
const int | kmr_version = KMR_H |
KMR Base Implementation (on-memory operations).
KMR aims at fast shuffling and scalability, and provides modest utilities for programming with map-reduce. This part implements on-memory operations.
Definition in file kmrbase.c.
#define KEY | ( | V | ) |
#define FIN | ( | ) |
KMR* kmr_create_context | ( | const MPI_Comm | comm, |
const MPI_Info | conf, | ||
const char * | identifying_name | ||
) |
Makes a new KMR context (a context has type KMR).
A KMR context is a record of common information to all key-value streams. COMM is a communicator for use inside. It dups the given communicator inside, to avoid conflicts with other calls to MPI functions. MPI should be initialized with a thread support level of either MPI_THREAD_SERIALIZED or MPI_THREAD_MULTIPLE. CONF specifies configuration options. It should be freed after a call. The options can differ on each rank, (in this version). The configuration options are first taken from a file with a name specified by the environment variable "KMROPTION" on rank0, and they are merged with the explicitly given ones. The KMROPTION file has the file format of Java properties (but only in Latin characters). Refer to JDK documents on "java.util.Properties" (on "load" method) for the file format. The explicitly given ones have precedence. IDENTIFYING_NAME is just recorded in the context, and has no specific use. It may be null.
int kmr_free_context | ( | KMR * | mr | ) |
Releases a context created with kmr_create_context().
KMR_KVS* kmr_create_kvs7 | ( | KMR * | mr, |
enum kmr_kv_field | kf, | ||
enum kmr_kv_field | vf, | ||
struct kmr_option | opt, | ||
const char * | file, | ||
const int | line, | ||
const char * | func | ||
) |
int kmr_move_kvs | ( | KMR_KVS * | kvi, |
KMR_KVS * | kvo, | ||
struct kmr_option | opt | ||
) |
Moves the contents of the input KVI to the output KVO.
It consumes the input KVI. Calling kmr_map() with a null map-function has the same effect. Effective-options: TAKE_CKPT. See struct kmr_option.
int kmr_free_kvs | ( | KMR_KVS * | kvs | ) |
Releases a key-value stream (type KMR_KVS).
Normally, mapper/shuffler/reducer consumes and frees the input key-value stream, and explicit calls are unnecessary. Here, mapper/shuffler/reducer includes kmr_map(), kmr_map_on_rank_zero(), kmr_map_ms(), kmr_shuffle(), kmr_replicate(), kmr_reduce(), and kmr_reduce_as_one().
int kmr_add_kv | ( | KMR_KVS * | kvs, |
const struct kmr_kv_box | kv | ||
) |
int kmr_add_kv1 | ( | KMR_KVS * | kvs, |
void * | k, | ||
int | klen, | ||
void * | v, | ||
int | vlen | ||
) |
int kmr_add_kv_space | ( | KMR_KVS * | kvs, |
const struct kmr_kv_box | kv, | ||
void ** | keyp, | ||
void ** | valuep | ||
) |
Adds a key-value pair, but only allocates a space and returns the pointers to the key and the value parts.
It may enable to create a large key/value data directly in the space. It does not return a proper value if a key/value field is not a pointer. (It cannot be used with a "push-off" key-value stream, because its buffer will be sent out and late fill-in the buffer causes a race).
int kmr_add_kv_done | ( | KMR_KVS * | kvs | ) |
Marks finished adding key-value pairs.
Further addition will be prohibited. Normally, mapper/shuffler/reducer finishes the output key-value stream by itself, and explicit calls are unnecessary. Here, mapper/shuffler/reducer includes kmr_map(), kmr_map_on_rank_zero(), kmr_map_ms(), kmr_shuffle(), kmr_replicate(), and kmr_reduce().
int kmr_add_string | ( | KMR_KVS * | kvs, |
const char * | k, | ||
const char * | v | ||
) |
int kmr_add_identity_fn | ( | const struct kmr_kv_box | kv, |
const KMR_KVS * | kvi, | ||
KMR_KVS * | kvo, | ||
void * | arg, | ||
const long | i | ||
) |
int kmr_save_kvs | ( | KMR_KVS * | kvs, |
void ** | dataq, | ||
size_t * | szq, | ||
struct kmr_option | opt | ||
) |
Packs locally the contents of a key-value stream to a byte array.
It is used to save or to send a key-value stream. It returns the allocated memory with its size, and it should be freed by the user. It may fail on allocating a buffer, and then it returns MPI_ERR_BUFFER. Its reverse is performed by kmr_restore_kvs().
int kmr_restore_kvs | ( | KMR_KVS * | kvo, |
void * | data, | ||
size_t | sz_, | ||
struct kmr_option | opt | ||
) |
Unpacks locally the contents of a key-value stream from a byte array.
It is a reverse of kmr_save_kvs().
int kmr_map_skipping | ( | long | from, |
long | stride, | ||
long | limit, | ||
_Bool | stop_when_some_added, | ||
KMR_KVS * | kvi, | ||
KMR_KVS * | kvo, | ||
void * | arg, | ||
struct kmr_option | opt, | ||
kmr_mapfn_t | m | ||
) |
int kmr_map9 | ( | _Bool | stop_when_some_added, |
KMR_KVS * | kvi, | ||
KMR_KVS * | kvo, | ||
void * | arg, | ||
struct kmr_option | opt, | ||
kmr_mapfn_t | m, | ||
const char * | file, | ||
const int | line, | ||
const char * | func | ||
) |
Maps simply.
It consumes the input key-value stream KVI unless INSPECT option is marked. The output key-value stream KVO can be null, but in that case, a map-function cannot add key-value pairs. The pointer ARG is just passed to a map-function as a general argument, where accesses to it should be race-free, since a map-function is called by threads by default. M is the map-function. See the description on the type kmr_mapfn_t. It copeis the contents of the input KVI to the output KVO, when a map-function is null. During processing, it first makes an array pointing to the key-value entries in each data block, and works on it for ease threading/parallelization. Effective-options: NOTHREADING, INSPECT, KEEP_OPEN, COLLAPSE, TAKE_CKPT. See struct kmr_option.
int kmr_map_rank_by_rank | ( | KMR_KVS * | kvi, |
KMR_KVS * | kvo, | ||
void * | arg, | ||
struct kmr_option | opt, | ||
kmr_mapfn_t | m | ||
) |
int kmr_take_one | ( | KMR_KVS * | kvi, |
struct kmr_kv_box * | kv | ||
) |
Extracts a single key-value pair locally in the key-value stream KVI.
It is an error when zero or more than one entries are in the KVI. It does not consume the input KVS (INSPECT IMPLIED). The returned key-value entry must be used before freeing the input KVS, when it points to an opaque data.
int kmr_map_once | ( | KMR_KVS * | kvo, |
void * | arg, | ||
struct kmr_option | opt, | ||
_Bool | rank_zero_only, | ||
kmr_mapfn_t | m | ||
) |
Maps once.
It calls a map-function once with a dummy key-value stream and a dummy key-value pair. See kmr_map(). Effective-options: KEEP_OPEN, TAKE_CKPT. See struct kmr_option.
int kmr_map_on_rank_zero | ( | KMR_KVS * | kvo, |
void * | arg, | ||
struct kmr_option | opt, | ||
kmr_mapfn_t | m | ||
) |
Maps on rank0 only.
It calls a map-function once with a dummy key-value stream and a dummy key-value pair. It is used to avoid low-level conditionals like (myrank==0). See kmr_map(). Effective-options: KEEP_OPEN, TAKE_CKPT. See struct kmr_option.
|
static |
int kmr_sort_locally | ( | KMR_KVS * | kvi, |
KMR_KVS * | kvo, | ||
_Bool | shuffling, | ||
struct kmr_option | opt | ||
) |
Reorders key-value pairs in a single rank.
It sorts pairs when SHUFFLING is false, or gathers pairs with the same hashed keys adjacent when SHUFFLING is true. It only respects for not ordering but just equality when shuffling. The sort-keys for shuffling are destination ranks for shuffling (taking a modulo of the hashed key with nprocs). As a sorting, it is NOT-STABLE due to quick-sort used inside. It converts pointer keys and values to opaque ones for sending.
Sorting on a key-value stream is by memcmp(), unless the keys are integer or floating-point numbers (ordering on integers and memcmp() are different). Sorting on non-numbers is performed in two steps: the first step sorts by the integer rankings, and the second by the specified comparator. And thus, the comparator is required to have a corresponding generator of integer rankings. It consumes the input key-value stream. Effective-options: NOTHREADING, INSPECT, KEY_AS_RANK.
int kmr_shuffle | ( | KMR_KVS * | kvi, |
KMR_KVS * | kvo, | ||
struct kmr_option | opt | ||
) |
Shuffles key-value pairs to the appropriate destination ranks.
It first sorts pairs by the destination ranks of the keys, and then exchanges pairs with all-to-all communication. It converts pointer keys and values to opaque ones for sending during the sorting stage. Note that the key-value pairs are sorted by the hash-values prior to exchange. Effective-options: INSPECT, KEY_AS_RANK, TAKE_CKPT. See struct kmr_option.
int kmr_replicate | ( | KMR_KVS * | kvi, |
KMR_KVS * | kvo, | ||
struct kmr_option | opt | ||
) |
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gather.
It gathers pairs on rank0 only by the option RANK_ZERO. It moves stably, keeping the ordering of ranks and the ordering of local key-value pairs. Effective-options: INSPECT, RANK_ZERO, TAKE_CKPT. See struct kmr_option.
int kmr_reduce9 | ( | _Bool | stop_when_some_added, |
KMR_KVS * | kvi, | ||
KMR_KVS * | kvo, | ||
void * | arg, | ||
struct kmr_option | opt, | ||
kmr_redfn_t | r, | ||
const char * | file, | ||
const int | line, | ||
const char * | func | ||
) |
Reduces key-value pairs.
It does not include shuffling, and thus, it requires being preceded by shuffling. Or, it works on local data (as a local combiner), if it is not preceded by shuffling. It always consumes the input key-value stream KVI. An output key-value stream KVO can be null. It passes an array of key-value pairs to a reduce-function whose keys are all equal (equality is by bits). The pointer ARG is just passed to a reduce-function as a general argument, where accesses to it should be race-free, since a reduce-function is called by threads by default. R is a reduce-function. See the description on the type kmr_redfn_t. A reduce-function may see a different input key-value stream (internally created one) instead of the one given. During reduction, it first scans adjacent equal keys, then calls a given reduce-function. Effective-options: NOTHREADING, INSPECT, TAKE_CKPT. See struct kmr_option.
int kmr_reduce_as_one | ( | KMR_KVS * | kvi, |
KMR_KVS * | kvo, | ||
void * | arg, | ||
struct kmr_option | opt, | ||
kmr_redfn_t | r | ||
) |
Calls a reduce-function once as if all key-value pairs had the same key.
See kmr_reduce(). Effective-options: INSPECT, TAKE_CKPT. See struct kmr_option.
int kmr_concatenate_kvs | ( | KMR_KVS * | kvs[], |
int | nkvs, | ||
KMR_KVS * | kvo, | ||
struct kmr_option | opt | ||
) |
Concatenates a number of KVSes to one.
Inputs are consumed. (It is fast because the key-value data is stored internally as a list of data blocks, and this routine just links them). Note that concatenating KVS can in effect be performed by consecutive calls to kmr_map() with the KEEP_OPEN option using the same output KVS. Effective-options: none.
struct kmr_kvs_entry* kmr_find_kvs_last_entry | ( | KMR_KVS * | kvs | ) |
int kmr_retrieve_kvs_entries | ( | KMR_KVS * | kvs, |
struct kmr_kvs_entry ** | ev, | ||
long | n | ||
) |
int kmr_retrieve_keyed_records | ( | KMR_KVS * | kvs, |
struct kmr_keyed_record * | ev, | ||
long | n, | ||
_Bool | shuffling, | ||
_Bool | ranking | ||
) |
int kmr_legal_minimum_field_size | ( | KMR * | mr, |
enum kmr_kv_field | f | ||
) |
int kmr_scan_locally | ( | KMR_KVS * | kvi, |
KMR_KVS * | carryin, | ||
KMR_KVS * | kvo, | ||
KMR_KVS * | carryout, | ||
kmr_redfn_t | r | ||
) |
Scans every key-value with a reduce-function locally (independently on each rank).
It works in the order in the KVS. It ignores differences of the keys. It gets the start value from CARRYIN and puts the final value to CARRYOUT. The output has the same number of entries as the input. The carry-in and carry-out have one entry. The carry-out can be null. The reduce-function is called on each key-value pair as the right operand with the previous value as the left operand, and it should output a single value. The key part of the output is ignored and a pair is stored under the original key.