17 #include <sys/types.h> 30 static struct timeval tv0 = {.tv_sec = 0};
33 cc = gettimeofday(&tv, 0);
35 if (tv0.tv_sec == 0) {
37 assert(tv0.tv_sec != 0);
39 double dt = ((double)(tv.tv_sec - tv0.tv_sec)
40 + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
45 read_records(
char *n,
KMR_KVS *kvo)
48 int nprocs = mr->nprocs;
51 FILE *f = fopen(n,
"r");
54 snprintf(ee, 80,
"fopen(%s)", n);
56 MPI_Abort(MPI_COMM_WORLD, 1);
63 snprintf(ee, 80,
"fstat(%s)", n);
65 MPI_Abort(MPI_COMM_WORLD, 1);
67 assert((s.st_size % RECLEN) == 0);
68 long totrecs = (s.st_size / RECLEN);
69 long erecs = ((totrecs + nprocs - 1) / nprocs);
70 long nrecs = ((rank != (nprocs - 1))
72 : (totrecs - (erecs * (nprocs - 1))));
73 off_t off = (RECLEN * erecs * rank);
74 cc = fseeko(f, off, SEEK_SET);
77 snprintf(ee, 80,
"fseek(%s,%zd)", n, off);
79 MPI_Abort(MPI_COMM_WORLD, 1);
82 for (
long i = 0 ; i < nrecs; i++) {
84 zz = fread(b,
sizeof(b), 1, f);
89 snprintf(ee, 80,
"fread(%s,%ld)", n, i);
91 MPI_Abort(MPI_COMM_WORLD, 1);
96 .k.p = b, .v.p = &b[
KEYLEN]};
98 assert(cc == MPI_SUCCESS);
103 snprintf(ee, 80,
"fclose(%s)", n);
105 MPI_Abort(MPI_COMM_WORLD, 1);
108 assert(cc == MPI_SUCCESS);
111 assert(cc == MPI_SUCCESS);
112 assert(totrecs == cnt);
118 sumreducefn(
const struct kmr_kv_box kv[],
const long n,
121 int rank = kvi->c.mr->rank;
124 for (
long i = 0; i < n; i++) {
128 .klen =
sizeof(long), .vlen =
sizeof(
long),
129 .k.i = rank, .v.i = v};
131 assert(cc == MPI_SUCCESS);
142 zz = fwrite(kv0.k.p,
KEYLEN, 1, f);
144 zz = fwrite(kv0.v.p, (RECLEN -
KEYLEN), 1, f);
150 write_records(
char *n0,
KMR_KVS *kvi)
156 long cnt = kvi->c.element_count;
159 .klen =
sizeof(long), .vlen =
sizeof(
long),
160 .k.i = rank, .v.i = cnt};
162 assert(cc == MPI_SUCCESS);
164 assert(cc == MPI_SUCCESS);
167 cc = kmr_scan(kvs0, kvs1, sumreducefn, kmr_noopt);
168 assert(cc == MPI_SUCCESS);
171 assert(cc == MPI_SUCCESS);
172 long precs = kv1.v.i;
173 off_t off = (precs * RECLEN);
176 snprintf(n,
sizeof(n),
"%s.%05d", n0, rank);
178 FILE *f = fopen(n,
"w");
181 snprintf(ee, 80,
"fopen(%s)", n);
183 MPI_Abort(MPI_COMM_WORLD, 1);
185 struct kmr_option nothreading = {.nothreading = 1};
186 cc =
kmr_map(kvi, 0, f, nothreading, writekvfn);
187 assert(cc == MPI_SUCCESS);
191 snprintf(ee, 80,
"fclose(%s)", n);
193 MPI_Abort(MPI_COMM_WORLD, 1);
199 main(
int argc,
char *argv[])
202 int nprocs, rank, thlv;
204 MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
205 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
206 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
212 MPI_Barrier(MPI_COMM_WORLD);
213 if (rank == 0) {printf(
"reading file...\n");}
216 cc = read_records(
"aaa", kvs0);
217 assert(cc == MPI_SUCCESS);
218 if (rank == 0) {printf(
"count=%ld\n", kvs0->c.element_count);}
220 MPI_Barrier(MPI_COMM_WORLD);
221 if (rank == 0) {printf(
"run sort...\n");}
223 MPI_Barrier(MPI_COMM_WORLD);
227 assert(cc == MPI_SUCCESS);
228 MPI_Barrier(MPI_COMM_WORLD);
231 if (rank == 0) {printf(
"run sort in %f sec\n", (t1 - t0));}
233 printf(
"count=%ld\n", kvs1->c.element_count);
235 cc = write_records(
"bbb", kvs1);
236 assert(cc == MPI_SUCCESS);
Key-Value Stream (abstract).
Options to Mapping, Shuffling, and Reduction.
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
#define KEYLEN
Maximum key-value string length.
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Handy Copy of a Key-Value Field.
int kmr_sort_large(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream by the regular or the random sampling-sort.
int kmr_fin(void)
Clears the environment.
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
#define kmr_init()
Sets up the environment.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).