KMR
graysort.c
1 /* graysort.c (2014-02-04) */
2 
3 /* GraySort. THIS IS NOT FOR BENCHMARKING. See the FAQ for the
4  benchmark rules in TeraSort in "http://sortbenchmark.org". */
5 
6 /* HOW TO RUN: First, run "gensort-1.2/gensort 1000000 aaa" to make a
7  data file. Second, run "mpirun -np n a.out" to sort "aaa" into
8  "bbb.nnnnn", where nnnnn are the ranks of the nodes. Third, run
9  "cat bbb.* > bbb" to concatenate outputs. Then, run
10  "gensort-1.2/valsort bbb" for sorted-ness and the checksum. */
11 
12 #include <mpi.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <assert.h>
20 #include "kmr.h"
21 
22 /* GraySort constants. */
23 
24 #define RECLEN 100
25 #define KEYLEN 10
26 
27 static double
28 wtime()
29 {
30  static struct timeval tv0 = {.tv_sec = 0};
31  struct timeval tv;
32  int cc;
33  cc = gettimeofday(&tv, 0);
34  assert(cc == 0);
35  if (tv0.tv_sec == 0) {
36  tv0 = tv;
37  assert(tv0.tv_sec != 0);
38  }
39  double dt = ((double)(tv.tv_sec - tv0.tv_sec)
40  + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
41  return dt;
42 }
43 
44 static int
45 read_records(char *n, KMR_KVS *kvo)
46 {
47  KMR *mr = kvo->c.mr;
48  int nprocs = mr->nprocs;
49  int rank = mr->rank;
50  int cc;
51  FILE *f = fopen(n, "r");
52  if (f == 0) {
53  char ee[80];
54  snprintf(ee, 80, "fopen(%s)", n);
55  perror(ee);
56  MPI_Abort(MPI_COMM_WORLD, 1);
57  }
58  int fd = fileno(f);
59  struct stat s;
60  cc = fstat(fd, &s);
61  if (cc != 0) {
62  char ee[80];
63  snprintf(ee, 80, "fstat(%s)", n);
64  perror(ee);
65  MPI_Abort(MPI_COMM_WORLD, 1);
66  }
67  assert((s.st_size % RECLEN) == 0);
68  long totrecs = (s.st_size / RECLEN);
69  long erecs = ((totrecs + nprocs - 1) / nprocs);
70  long nrecs = ((rank != (nprocs - 1))
71  ? erecs
72  : (totrecs - (erecs * (nprocs - 1))));
73  off_t off = (RECLEN * erecs * rank);
74  cc = fseeko(f, off, SEEK_SET);
75  if (cc != 0) {
76  char ee[80];
77  snprintf(ee, 80, "fseek(%s,%zd)", n, off);
78  perror(ee);
79  MPI_Abort(MPI_COMM_WORLD, 1);
80  }
81  size_t zz;
82  for (long i = 0 ; i < nrecs; i++) {
83  char b[RECLEN];
84  zz = fread(b, sizeof(b), 1, f);
85  if (zz < 1) {
86  char ee[80];
87  int eof = feof(f);
88  assert(eof != 0);
89  snprintf(ee, 80, "fread(%s,%ld)", n, i);
90  perror(ee);
91  MPI_Abort(MPI_COMM_WORLD, 1);
92  }
93  assert(zz == 1);
94  struct kmr_kv_box kv = {
95  .klen = KEYLEN, .vlen = (RECLEN - KEYLEN),
96  .k.p = b, .v.p = &b[KEYLEN]};
97  cc = kmr_add_kv(kvo, kv);
98  assert(cc == MPI_SUCCESS);
99  }
100  cc = fclose(f);
101  if (cc != 0) {
102  char ee[80];
103  snprintf(ee, 80, "fclose(%s)", n);
104  perror(ee);
105  MPI_Abort(MPI_COMM_WORLD, 1);
106  }
107  cc = kmr_add_kv_done(kvo);
108  assert(cc == MPI_SUCCESS);
109  long cnt;
110  cc = kmr_get_element_count(kvo, &cnt);
111  assert(cc == MPI_SUCCESS);
112  assert(totrecs == cnt);
113  return MPI_SUCCESS;
114 }
115 
116 #if 0
117 static int
118 sumreducefn(const struct kmr_kv_box kv[], const long n,
119  const KMR_KVS *kvi, KMR_KVS *kvo, void *p)
120 {
121  int rank = kvi->c.mr->rank;
122  int cc;
123  long v = 0;
124  for (long i = 0; i < n; i++) {
125  v += kv[i].v.i;
126  }
127  struct kmr_kv_box kv0 = {
128  .klen = sizeof(long), .vlen = sizeof(long),
129  .k.i = rank, .v.i = v};
130  cc = kmr_add_kv(kvo, kv0);
131  assert(cc == MPI_SUCCESS);
132  return MPI_SUCCESS;
133 }
134 #endif
135 
136 static int
137 writekvfn(const struct kmr_kv_box kv0,
138  const KMR_KVS *kvs0, KMR_KVS *kvo, void *p, const long i)
139 {
140  FILE *f = p;
141  size_t zz;
142  zz = fwrite(kv0.k.p, KEYLEN, 1, f);
143  assert(zz == 1);
144  zz = fwrite(kv0.v.p, (RECLEN - KEYLEN), 1, f);
145  assert(zz == 1);
146  return MPI_SUCCESS;
147 }
148 
149 static int
150 write_records(char *n0, KMR_KVS *kvi)
151 {
152  KMR *mr = kvi->c.mr;
153  //int nprocs = mr->nprocs;
154  int rank = mr->rank;
155 #if 0
156  long cnt = kvi->c.element_count;
157  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
158  struct kmr_kv_box kv0 = {
159  .klen = sizeof(long), .vlen = sizeof(long),
160  .k.i = rank, .v.i = cnt};
161  cc = kmr_add_kv(kvs0, kv0);
162  assert(cc == MPI_SUCCESS);
163  cc = kmr_add_kv_done(kvs0);
164  assert(cc == MPI_SUCCESS);
165  /* Entries are sorted. */
166  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_INTEGER, KMR_KV_INTEGER);
167  cc = kmr_scan(kvs0, kvs1, sumreducefn, kmr_noopt);
168  assert(cc == MPI_SUCCESS);
169  struct kmr_kv_box kv1;
170  cc = kmr_take_one(kvs1, &kv1);
171  assert(cc == MPI_SUCCESS);
172  long precs = kv1.v.i;
173  off_t off = (precs * RECLEN);
174 #endif
175  char n[80];
176  snprintf(n, sizeof(n), "%s.%05d", n0, rank);
177  int cc;
178  FILE *f = fopen(n, "w");
179  if (f == 0) {
180  char ee[80];
181  snprintf(ee, 80, "fopen(%s)", n);
182  perror(ee);
183  MPI_Abort(MPI_COMM_WORLD, 1);
184  }
185  struct kmr_option nothreading = {.nothreading = 1};
186  cc = kmr_map(kvi, 0, f, nothreading, writekvfn);
187  assert(cc == MPI_SUCCESS);
188  cc = fclose(f);
189  if (cc != 0) {
190  char ee[80];
191  snprintf(ee, 80, "fclose(%s)", n);
192  perror(ee);
193  MPI_Abort(MPI_COMM_WORLD, 1);
194  }
195  return MPI_SUCCESS;
196 }
197 
198 int
199 main(int argc, char *argv[])
200 {
201  int cc;
202  int nprocs, rank, thlv;
203  /*MPI_Init(&argc, &argv);*/
204  MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
205  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
206  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
207 
208  kmr_init();
209  KMR *mr = kmr_create_context(MPI_COMM_WORLD, MPI_INFO_NULL, 0);
210  //mr->one_step_sort = 0;
211  //mr->preset_block_size = 750;
212  MPI_Barrier(MPI_COMM_WORLD);
213  if (rank == 0) {printf("reading file...\n");}
214 
215  KMR_KVS *kvs0 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
216  cc = read_records("aaa", kvs0);
217  assert(cc == MPI_SUCCESS);
218  if (rank == 0) {printf("count=%ld\n", kvs0->c.element_count);}
219 
220  MPI_Barrier(MPI_COMM_WORLD);
221  if (rank == 0) {printf("run sort...\n");}
222 
223  MPI_Barrier(MPI_COMM_WORLD);
224  double t0 = wtime();
225  KMR_KVS *kvs1 = kmr_create_kvs(mr, KMR_KV_OPAQUE, KMR_KV_OPAQUE);
226  cc = kmr_sort_large(kvs0, kvs1, kmr_noopt);
227  assert(cc == MPI_SUCCESS);
228  MPI_Barrier(MPI_COMM_WORLD);
229  double t1 = wtime();
230 
231  if (rank == 0) {printf("run sort in %f sec\n", (t1 - t0));}
232 
233  printf("count=%ld\n", kvs1->c.element_count);
234 
235  cc = write_records("bbb", kvs1);
236  assert(cc == MPI_SUCCESS);
237 
238  kmr_fin();
239  MPI_Finalize();
240 
241  return 0;
242 }
Key-Value Stream (abstract).
Definition: kmr.h:587
Options to Mapping, Shuffling, and Reduction.
Definition: kmr.h:613
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair.
Definition: kmrbase.c:751
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes.
Definition: kmr.h:71
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs.
Definition: kmrbase.c:881
#define KEYLEN
Maximum key-value string length.
Definition: wc.reducer.c:13
KMR Context.
Definition: kmr.h:222
int kmr_take_one(KMR_KVS *kvi, struct kmr_kv_box *kv)
Extracts a single key-value pair locally in the key-value stream KVI.
Definition: kmrbase.c:1369
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply.
Definition: kmr.h:82
Handy Copy of a Key-Value Field.
Definition: kmr.h:358
int kmr_sort_large(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream by the regular or the random sampling-sort.
Definition: kmrmoreops.c:469
int kmr_fin(void)
Clears the environment.
Definition: kmrbase.c:124
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs.
Definition: kmrmoreops.c:114
#define kmr_init()
Sets up the environment.
Definition: kmr.h:747
KMR Interface.
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).
Definition: kmrbase.c:147