68 #include <sys/types.h>    70 #include <sys/resource.h>    71 #include <netinet/in.h>    77 #define MIN(a,b) (((a)<(b))?(a):(b))    83     _Bool load_tables_in_advance;
    84     _Bool hang_out_communication;
    85     _Bool redistribute_loaded_tables;
    86     _Bool use_small_block_size;
    87     size_t pushoff_block_size_in_kilo;
    90 _Bool load_tables_in_advance_;
    91 _Bool redistribute_loaded_tables_;
    93 _Bool files_in_rank_directory = 0;
    95 _Bool report_count_in_messages = 0;
    96 _Bool report_time_to_read = 0;
    97 _Bool report_pushoff_statistics = 0;
    99 #undef USE_TIME_FUNCTIONS   102 #define NAME_SIZE (25)   106 enum {T_FST = 0, T_SND = 1};
   107 #define FST ((int)T_FST)   108 #define SND ((int)T_SND)   115 strnlen_(
const char *s, 
size_t n)
   121     const char *limit = (s + n);
   122     while (*p != 0 && p < limit) {
   125     return (
size_t)(p - s);
   131 strnstr_(
const char *s1, 
const char *s2, 
size_t n)
   133     if (s1 == 0 || s2 == 0 || *s2 == 0) {
   136     size_t len = strnlen_(s2, n);
   137     const char *limit = (s1 + n - len + 1);
   140     while (*p != 0 && p < limit) {
   141         if (*p == c0 && strncmp(p, s2, len) == 0) {
   154     union {uint64_t ll; uint32_t i[2];} v;
   155     uint32_t lo = (uint32_t)x;
   156     uint32_t hi = (uint32_t)(x >> 32);
   164 static char *xxx_optarg;
   165 static int xxx_optind = 1;
   166 static int xxx_optopt;
   170 xxx_getopt(
int argc, 
char **argv, 
char *optstring)
   173     if (xxx_optind >= argc) {
   176     char *p = argv[xxx_optind];
   180     } 
else if (strcmp(p, 
"-") == 0) {
   183     } 
else if (strcmp(p, 
"--") == 0) {
   189         char *o = strchr(optstring, xxx_optopt);
   192         } 
else if (o[1] != 
':') {
   201                 if (xxx_optind < argc) {
   202                     xxx_optarg = argv[xxx_optind];
   220     TAB_N, TAB_R, TAB_P, TAB_S, TAB_H, TAB_C, TAB_O, TAB_L,
   222     TAB_Q7_N1, TAB_Q7_NN, TAB_Q7_S1, TAB_Q7_NNS,
   223     TAB_Q7_C1, TAB_Q7_O1, TAB_Q7_CO, TAB_Q7_L1,
   224     TAB_Q7_CLO, TAB_Q7_CLNNOS0, TAB_Q7_CLNNOS1, TAB_Q7_REVENUE,
   226     TAB_Q9, TAB_Q9_PH, TAB_Q9_NS, TAB_Q9_PHSN,
   227     TAB_Q9_LO, TAB_Q9_PHSNLO, TAB_Q9_AMOUNT,
   229     TAB_Q10_L1, TAB_Q10_O1, TAB_Q10_LO, TAB_Q10_C1,
   230     TAB_Q10_N1, TAB_Q10_CN, TAB_Q10_CLNO0, TAB_Q10_CLNO1,
   232     TAB_Q13_O1, TAB_Q13_CO0, TAB_Q13_CO1,
   234     TAB_Q21_N1, TAB_Q21_NS, TAB_Q21_L1, TAB_Q21_O1,
   235     TAB_Q21_L3, TAB_Q21_LNS, TAB_Q21_NAME,
   239 KMR_KVS *N0, *R0, *P0, *S0, *H0, *C0, *O0, *L0;
   241 enum FIELD {F_NIL, F_ZAHL, F_REAL, F_TEXT, F_DATE};
   250     struct COLUMN columns[MAXCOLS];
   252     struct COLUMN keys[MAXCOLS];
   257 #define RECORD_SIZE (1024)   270     struct RECORD description;
   272     {TAB_N, {&N0, 
"nation.tbl", 1, 0, 0, 0, 0},
   273      {4, {{
"n_nationkey", F_ZAHL},
   275           {
"n_regionkey", F_ZAHL},
   276           {
"n_comment", F_TEXT}},
   277       1, {{
"n_nationkey", F_ZAHL}}}},
   278     {TAB_R, {&R0, 
"region.tbl", 1, 0, 0, 0, 0},
   279      {3, {{
"r_regionkey", F_ZAHL},
   281           {
"r_comment", F_TEXT}},
   282       1, {{
"r_regionkey", F_ZAHL}}}},
   283     {TAB_P, {&P0, 
"part.tbl", 2, 0, 0, 0, 0},
   284      {9, {{
"p_partkey", F_ZAHL},
   290           {
"p_container", F_TEXT},
   291           {
"p_retailprice", F_REAL},
   292           {
"p_comment", F_TEXT}},
   293       1, {{
"p_partkey", F_ZAHL}}}},
   294     {TAB_S, {&S0, 
"supplier.tbl", 2, 0, 0, 0, 0},
   295      {7, {{
"s_suppkey", F_ZAHL},
   297           {
"s_address", F_TEXT},
   298           {
"s_nationkey", F_ZAHL},
   300           {
"s_acctbal", F_REAL},
   301           {
"s_comment", F_TEXT}},
   302       1, {{
"s_suppkey", F_ZAHL}}}},
   303     {TAB_H, {&H0, 
"partsupp.tbl", 2, 0, 0, 0, 0},
   304      {5, {{
"ps_partkey", F_ZAHL},
   305           {
"ps_suppkey", F_ZAHL},
   306           {
"ps_availqty", F_ZAHL},
   307           {
"ps_supplycost", F_REAL},
   308           {
"ps_comment", F_TEXT}},
   309       1, {{
"ps_partkey", F_ZAHL}}}},
   310     {TAB_C, {&C0, 
"customer.tbl", 2, 0, 0, 0, 0},
   311      {8, {{
"c_custkey", F_ZAHL},
   313           {
"c_address", F_TEXT},
   314           {
"c_nationkey", F_ZAHL},
   316           {
"c_acctbal", F_REAL},
   317           {
"c_mktsegment", F_TEXT},
   318           {
"c_comment", F_TEXT}},
   319       1, {{
"c_custkey", F_ZAHL}}}},
   320     {TAB_O, {&O0, 
"orders.tbl", 2, 0, 0, 0, 0},
   321      {9, {{
"o_orderkey", F_ZAHL},
   322           {
"o_custkey", F_ZAHL},
   323           {
"o_orderstatus", F_TEXT},
   324           {
"o_totalprice", F_REAL},
   325           {
"o_orderdate", F_DATE},
   326           {
"o_orderpriority", F_TEXT},
   328           {
"o_shippriority", F_ZAHL},
   329           {
"o_comment", F_TEXT}},
   330       1, {{
"o_orderkey", F_ZAHL}}}},
   331     {TAB_L, {&L0, 
"lineitem.tbl", 2, 0, 0, 0, 0},
   332      {16, {{
"l_orderkey", F_ZAHL},
   333            {
"l_partkey", F_ZAHL},
   334            {
"l_suppkey", F_ZAHL},
   335            {
"l_linenumber", F_REAL},
   336            {
"l_quantity", F_REAL},
   337            {
"l_extendedprice", F_REAL},
   338            {
"l_discount", F_REAL},
   340            {
"l_returnflag", F_TEXT},
   341            {
"l_linestatus", F_TEXT},
   342            {
"l_shipdate", F_DATE},
   343            {
"l_commitdate", F_DATE},
   344            {
"l_receiptdate", F_DATE},
   345            {
"l_shipinstruct", F_TEXT},
   346            {
"l_shipmode", F_TEXT},
   347            {
"l_comment", F_TEXT}},
   348       1, {{
"l_orderkey", F_ZAHL}}}},
   353      {2, {{
"n_nationkey", F_ZAHL},
   357      {4, {{
"n1.n_nationkey", F_ZAHL},
   358           {
"n1.n_name", F_TEXT},
   359           {
"n2.n_nationkey", F_ZAHL},
   360           {
"n2.n_name", F_TEXT}},
   361       1, {{
"n1.n_nationkey", F_ZAHL}}}},
   363      {2, {{
"s_nationkey", F_ZAHL},
   364           {
"s_suppkey", F_ZAHL}},
   365       1, {{
"s_nationkey", F_ZAHL}}}},
   367      {4, {{
"n2.n_nationkey", F_ZAHL},
   368           {
"s_suppkey", F_ZAHL},
   369           {
"n1.n_name", F_TEXT},
   370           {
"n2.n_name", F_TEXT}},
   371       2, {{
"n2.s_nationkey", F_ZAHL},
   372           {
"s_suppkey", F_ZAHL}}}},
   374      {2, {{
"c_custkey", F_ZAHL},
   375           {
"c_nationkey", F_ZAHL}},
   376       1, {{
"c_custkey", F_ZAHL}}}},
   378      {2, {{
"o_custkey", F_ZAHL},
   379           {
"o_orderkey", F_ZAHL}},
   380       1, {{
"o_custkey", F_ZAHL}}}},
   382      {2, {{
"o_orderkey", F_ZAHL},
   383           {
"c_nationkey", F_ZAHL}},
   384       1, {{
"o_orderkey", F_ZAHL}}}},
   386      {4, {{
"l_orderkey", F_ZAHL},
   387           {
"l_suppkey", F_ZAHL},
   390       1, {{
"l_orderkey", F_ZAHL}}}},
   392      {4, {{
"c_nationkey", F_ZAHL},
   393           {
"l_suppkey", F_ZAHL},
   396       2, {{
"c_nationkey", F_ZAHL},
   397           {
"l_suppkey", F_ZAHL}}}},
   398     {TAB_Q7_CLNNOS0, {0},
   399      {4, {{
"n1.n_name", F_TEXT},
   400           {
"n2.n_name", F_TEXT},
   404     {TAB_Q7_CLNNOS1, {0},
   405      {4, {{
"n1.n_name", F_TEXT},
   406           {
"n2.n_name", F_TEXT},
   409       3, {{
"n1.n_name", F_TEXT},
   410           {
"n2.n_name", F_TEXT},
   412     {TAB_Q7_REVENUE, {0},
   413      {4, {{
"n1.n_name", F_TEXT},
   414           {
"n2.n_name", F_TEXT},
   416           {
"revenue", F_REAL}},
   417       3, {{
"n1.n_name", F_TEXT},
   418           {
"n2.n_name", F_TEXT},
   424      {3, {{
"n_name", F_TEXT},
   425           {
"o_orderdate", F_DATE},
   427       1, {{
"nation+year", F_NIL}}}},
   429      {3, {{
"ps_partkey", F_ZAHL},
   430           {
"ps_suppkey", F_ZAHL},
   431           {
"ps_supplycost", F_REAL}},
   432       1, {{
"ps_suppkey", F_ZAHL}}}},
   434      {2, {{
"n_name", F_TEXT},
   435           {
"s_suppkey", F_ZAHL}},
   436       1, {{
"s_suppkey", F_ZAHL}}}},
   438      {4, {{
"n_name", F_TEXT},
   439           {
"ps_partkey", F_ZAHL},
   440           {
"ps_suppkey", F_ZAHL},
   441           {
"ps_supplycost", F_REAL}},
   442       2, {{
"ps_partkey", F_ZAHL},
   443           {
"ps_suppkey", F_ZAHL}}}},
   445      {6, {{
"l_discount", F_REAL},
   446           {
"l_extendedprice", F_REAL},
   447           {
"l_partkey", F_ZAHL},
   448           {
"l_quantity", F_REAL},
   449           {
"l_suppkey", F_ZAHL},
   450           {
"o_orderdate", F_DATE}},
   451       2, {{
"l_partkey", F_ZAHL},
   452           {
"l_suppkey", F_ZAHL}}}},
   454      {6, {{
"l_discount", F_REAL},
   455           {
"l_extendedprice", F_REAL},
   456           {
"l_quantity", F_REAL},
   458           {
"o_orderdate", F_DATE},
   459           {
"ps_supplycost", F_REAL}},
   460       1, {{
"o_orderdate", F_DATE}}}},
   462      {3, {{
"nation", F_TEXT},
   465       2, {{
"nation", F_TEXT},
   471      {2, {{
"l_orderkey", F_ZAHL},
   473       1, {{
"l_orderkey", F_ZAHL}}}},
   475      {2, {{
"o_orderkey", F_ZAHL},
   476           {
"o_custkey", F_ZAHL}},
   477       1, {{
"o_orderkey", F_ZAHL}}}},
   479      {2, {{
"o_custkey", F_ZAHL},
   481       1, {{
"o_custkey", F_ZAHL}}}},
   483      {7, {{
"c_nationkey", F_ZAHL},
   484           {
"c_custkey", F_ZAHL},
   486           {
"c_acctbal", F_REAL},
   488           {
"c_address", F_TEXT},
   489           {
"c_comment", F_TEXT}},
   490       1, {{
"c_nationkey", F_ZAHL}}}},
   492      {7, {{
"n_nationkey", F_ZAHL},
   494       1, {{
"n_nationkey", F_ZAHL}}}},
   496      {7, {{
"c_custkey", F_ZAHL},
   498           {
"c_acctbal", F_REAL},
   501           {
"c_address", F_TEXT},
   502           {
"c_comment", F_TEXT}},
   503       1, {{
"c_custkey", F_ZAHL}}}},
   505      {8, {{
"c_custkey", F_ZAHL},
   507           {
"c_acctbal", F_REAL},
   510           {
"c_address", F_TEXT},
   511           {
"c_comment", F_TEXT},
   513       7, {{
"c_custkey", F_ZAHL},
   515           {
"c_acctbal", F_REAL},
   518           {
"c_address", F_TEXT},
   519           {
"c_comment", F_TEXT}}}},
   521      {8, {{
"c_custkey", F_ZAHL},
   524           {
"c_acctbal", F_REAL},
   526           {
"c_address", F_TEXT},
   528           {
"c_comment", F_TEXT}},
   529       1, {{
"revenue", F_REAL}}}},
   534      {1, {{
"o_orderkey", F_ZAHL}},
   535       1, {{
"o_custkey", F_ZAHL}}}},
   537      {1, {{
"c_custkey", F_ZAHL}},
   538       1, {{
"q13_count", F_ZAHL}}}},
   540      {2, {{
"q13_custdist", F_ZAHL},
   541           {
"q13_count", F_ZAHL}},
   542       2, {{
"q13_custdist", F_ZAHL},
   543           {
"q13_count", F_ZAHL}}}},
   548      {1, {{
"n_nationkey", F_ZAHL}},
   549       1, {{
"n_nationkey", F_ZAHL}}}},
   551      {1, {{
"s_suppkey", F_ZAHL},
   553       1, {{
"s_suppkey", F_ZAHL}}}},
   555      {2, {{
"l_orderkey", F_ZAHL},
   556           {
"l_suppkey", F_ZAHL}},
   557       1, {{
"l_suppkey", F_ZAHL}}}},
   559      {2, {{
"l_orderkey", F_ZAHL},
   560           {
"l_suppkey", F_ZAHL}},
   561       1, {{
"l_orderkey", F_ZAHL}}}},
   563      {3, {{
"l_orderkey", F_ZAHL},
   564           {
"l_suppkey", F_ZAHL},
   566       1, {{
"l_orderkey", F_ZAHL}}}},
   568      {1, {{
"o_orderkey", F_ZAHL}},
   569       1, {{
"o_orderkey", F_ZAHL}}}},
   571      {1, {{
"s_name", F_TEXT}},
   572       1, {{
"s_name", F_TEXT}}}},
   573     {TAB_Q21_NUMWAIT, {0},
   574      {2, {{
"s_name", F_TEXT},
   575           {
"numwait", F_ZAHL}},
   576       1, {{
"numwait", F_ZAHL},
   577           {
"s_name", F_TEXT}}}}};
   585     int columns[MAXCOLS][2];
   587     int keys[MAXCOLS][2];
   589     _Bool trace_product_nonempty;
   606     int columns[MAXCOLS];
   615     char *columns[MAXCOLS];
   629     static struct timeval tv0 = {.tv_sec = 0};
   632     cc = gettimeofday(&tv, 0);
   634     if (tv0.tv_sec == 0) {
   636         assert(tv0.tv_sec != 0);
   638     double dt = ((double)(tv.tv_sec - tv0.tv_sec)
   639                  + ((double)(tv.tv_usec - tv0.tv_usec) * 1e-6));
   646 pcount(
KMR_KVS *kvs0, 
KMR_KVS *kvs1, 
char *msg, _Bool before0after1)
   648     KMR *mr = kvs0->c.mr;
   649     if (report_count_in_messages) {
   657             char *s0 = (before0after1 == 0 ? 
"before" : 
"after");
   658             char *s1 = (before0after1 == 0 ? 
"..." : 
"");
   660                 printf(
"%s %s #=%ld #=%ld%s\n", msg, s0, c0, c1, s1);
   663                 printf(
"%s %s #=%ld%s\n", msg, s0, c0, s1);
   669             char *s0 = (before0after1 == 0 ? 
"before" : 
"after");
   670             char *s1 = (before0after1 == 0 ? 
"..." : 
"");
   671             printf(
"%s %s%s\n", msg, s0, s1);
   680 ptime(
KMR_KVS *kvs0, 
KMR_KVS *kvs1, 
char *func, 
char *msg, 
double dt)
   682     KMR *mr = kvs0->c.mr;
   683     if (report_count_in_messages) {
   697                     printf(
"%s (%s) #=%ld #=%ld in %f sec\n",
   698                            func, msg, c0, c1, dt);
   701                     printf(
"%s (%s) #=%ld in %f sec\n",
   710         } 
else if (mr->rank == 0) {
   711             printf(
"%s (%s) in %f sec\n", func, msg, dt);
   718 phisto(
KMR_KVS *kvs, 
char *msg)
   721     int nprocs = mr->nprocs;
   726         for (
int r = 0; r < nprocs; r++) {
   727             printf(
"%s histo[%d]=%ld\n", msg, r, histo[r]);
   742         switch (kvo->c.key_data) {
   744             assert(kvo->c.key_data != KMR_KV_BAD);
   751             k1.d = *(
double *)e.p;
   755         case KMR_KV_POINTER_OWNED:
   756         case KMR_KV_POINTER_UNMANAGED:
   783 find_table(
enum TABLE table)
   785     int ntables = (
sizeof(tables) / 
sizeof(tables[0]));
   786     for (
int i = 0; i < ntables; i++) {
   787         if (tables[i].name == table) {
   796 find_description(
enum TABLE table)
   799     return &(tbl->description);
   803 column_index_by_name(
struct RECORD *description, 
char *name)
   805     for (
int i = 0; i < MAXCOLS; i++) {
   806         if (description->columns[i].label == 0) {
   809         if (strcmp(description->columns[i].label, name) == 0) {
   818 column_by_name(struct kmr_ntuple *u, 
struct RECORD *description, 
char *name)
   820     int c = column_index_by_name(description, name);
   826 put_columns_by_indexes(
KMR *mr, 
struct kmr_ntuple *v, 
size_t vsz,
   827                        struct kmr_ntuple *u, 
int *cols, 
int ncols)
   829     for (
int i = 0; i < ncols; i++) {
   836 put_columns_by_names(
KMR *mr, 
struct kmr_ntuple *v, 
size_t vsz,
   837                      struct kmr_ntuple *u, 
struct RECORD *d,
   838                      char **columns, 
int ncolumns)
   840     for (
int i = 0; i < ncolumns; i++) {
   841         char *name = columns[i];
   849 assert_column_fields(
int inputs[2], 
int output,
   850                      int columns[][2], 
int ncolumns,
   851                      int keys[][2], 
int nkeys)
   853     struct RECORD *a = find_description((
enum TABLE)inputs[0]);
   854     struct RECORD *b = find_description((
enum TABLE)inputs[1]);
   855     struct RECORD *o = find_description((
enum TABLE)output);
   856     assert(o->ncolumns == ncolumns);
   857     for (
int i = 0; i < ncolumns; i++) {
   858         int *choice = columns[i];
   859         assert(choice[0] < (choice[1] == 0 ? a : b)->ncolumns);
   860         enum FIELD fi = ((choice[1] == 0 ? a : b)->columns[choice[0]]).field;
   861         enum FIELD fo = o->columns[i].field;
   864     assert(o->nkeys == nkeys);
   865     for (
int i = 0; i < nkeys; i++) {
   866         int *choice = keys[i];
   867         assert(choice[0] < (choice[1] == 0 ? a : b)->ncolumns);
   868         enum FIELD fi = ((choice[1] == 0 ? a : b)->columns[choice[0]]).field;
   869         enum FIELD fo = o->keys[i].field;
   881 #ifdef USE_TIME_FUNCTIONS   884         char *end = strptime(p, 
"%F", &tm);
   885         time_t tv = mktime(&tm);
   886         if ((tv == (time_t)-1) || ((end - p) != 10)) {
   893         assert(
sizeof(time_t) >= 8);
   895         if ((p[4] != 
'-') || (p[7] != 
'-')) {
   898         for (
int i = 0; i < 10; i++) {
   899             if ((i != 4 && i != 7) && !(
'0' <= p[i] && p[i] <= 
'9')) {
   903         long v = (((long)p[0] << 8*7) | ((long)p[1] << 8*6)
   904                   | ((long)p[2] << 8*5) | ((long)p[3] << 8*4)
   905                   | ((long)p[5] << 8*3) | ((long)p[6] << 8*2)
   906                   | ((long)p[8] << 8*1) | ((long)p[9] << 8*0));
   916 format_date(
char *s, 
size_t sz, time_t tv)
   918 #ifdef USE_TIME_FUNCTIONS   921         struct tm *tm = localtime_r(&tv, &tmbuf);
   925         size_t cx = strftime(s, sz, 
"%F", tm);
   932         s[0] = (char)((v >> 8*7) & 0xff);
   933         s[1] = (char)((v >> 8*6) & 0xff);
   934         s[2] = (char)((v >> 8*5) & 0xff);
   935         s[3] = (char)((v >> 8*4) & 0xff);
   937         s[5] = (char)((v >> 8*3) & 0xff);
   938         s[6] = (char)((v >> 8*2) & 0xff);
   940         s[8] = (char)((v >> 8*1) & 0xff);
   941         s[9] = (char)((v >> 8*0) & 0xff);
   950 year_value(time_t date)
   952 #ifdef USE_TIME_FUNCTIONS   955         struct tm *tmx = localtime_r(&date, &tm);
   966         time_t year = mktime(&tm);
   967         assert(year != (time_t)-1);
   970         char tv0[32], tv1[32];
   972         localtime_r(&date, &tm0);
   973         strftime(tv0, 
sizeof(tv0), 
"%F", &tm0);
   974         localtime_r(&year, &tm1);
   975         strftime(tv1, 
sizeof(tv1), 
"%F", &tm1);
   976         printf(
"date=%s year=%s\n", tv0, tv1); fflush(0);
   983         long m = (((long)0xff << 8*3) | ((long)0xff << 8*2)
   984                   | ((long)0xff << 8*1) | ((long)0xff << 8*0));
   985         long z = (((long)
'0' << 8*3) | ((long)
'1' << 8*2)
   986                   | ((long)
'0' << 8*1) | ((long)
'1' << 8*0));
   987         long v = ((date & ~m) | z);
   995 get_int_column_by_index(
struct kmr_ntuple *u, 
struct COLUMN *columns, 
int nth)
   997     assert(columns[nth].field == F_ZAHL);
   999     long v = *(
long *)(e.p);
  1004 get_real_column_by_index(
struct kmr_ntuple *u, 
struct COLUMN *columns, 
int nth)
  1006     assert(columns[nth].field == F_REAL);
  1008     double v = *(
double *)(e.p);
  1013 get_real_column(
struct kmr_ntuple *u, 
struct RECORD *description,
  1016     int nth = column_index_by_name(description, name);
  1017     assert(description->columns[nth].field == F_REAL);
  1019     double v = *(
double *)(e.p);
  1025 get_text_column_by_index(
struct kmr_ntuple *u, 
struct COLUMN *columns, 
int nth)
  1027     assert(columns[nth].field == F_TEXT);
  1035 get_date_column_by_index(
struct kmr_ntuple *u, 
struct COLUMN *columns, 
int nth)
  1037     assert(columns[nth].field == F_DATE);
  1039     time_t v = *(time_t *)(e.p);
  1044 get_date_column(
struct kmr_ntuple *u, 
struct RECORD *description, 
char *name)
  1046     int nth = column_index_by_name(description, name);
  1047     assert(description->columns[nth].field == F_DATE);
  1049     time_t v = *(time_t *)(e.p);
  1057 scan_columns(
KMR *mr, 
struct kmr_ntuple *u, 
int bufsz,
  1058              char *line, 
size_t linesz, 
struct TABLE_INFO *tbl)
  1062     int marker = (int)tbl->name;
  1063     struct RECORD *description = &(tbl->description);
  1064     struct COLUMN *descs = description->columns;
  1065     int ndescs = description->ncolumns;
  1068     char * 
const end = (line + linesz);
  1070     for (
int i = 0; i < ndescs; i++) {
  1072         while (s < end && *s != 
'|') {
  1075         char * 
const q = ((s == end) ? 0 : s);
  1077             fprintf(stderr, 
"Fewer fields in line (%s)\n", line);
  1078             MPI_Abort(MPI_COMM_WORLD, 1);
  1084         switch (descs[i].field) {
  1092             cc = sscanf(p, 
"%ld%c", &v, gomi);
  1095                 fprintf(stderr, 
"Bad integer in %d-th field in line (%s)\n",
  1097                 MPI_Abort(MPI_COMM_WORLD, 1);
  1106             cc = sscanf(p, 
"%lf%c", &v, gomi);
  1109                 fprintf(stderr, 
"Bad real in %d-th field in line (%s)\n",
  1111                 MPI_Abort(MPI_COMM_WORLD, 1);
  1119             int len = (int)(q - p);
  1124             time_t tv = decode_date(p);
  1125             if (tv == (time_t)-1) {
  1127                 fprintf(stderr, 
"Bad date in %d-th field in line (%s)\n",
  1129                 MPI_Abort(MPI_COMM_WORLD, 1);
  1141         fprintf(stderr, 
"(warning) Extra characters in line (%s)\n", line);
  1151     char line[RECORD_SIZE];
  1152     char vbuf[RECORD_SIZE];
  1153     struct kmr_ntuple *v = (
void *)vbuf;
  1155     KMR *mr = kvo->c.mr;
  1159     char *p = (
void *)kv0.v.p;
  1160     size_t linesz = (
size_t)kv0.vlen;
  1162     assert(linesz < 
sizeof(line));
  1163     memcpy(line, p, linesz);
  1166     scan_columns(mr, v, 
sizeof(vbuf), line, linesz, tbl);
  1176     if (scanner->fn == 0) {
  1179         (*scanner->fn)(kv, kvi, kvo, scanner->arg, index);
  1184 static void load_table_files_in_memory(
int nprocs, 
int rank, 
char *directory,
  1186 static void load_one_table_file_in_memory(
int nprocs, 
int rank,
  1188                                           char *filename, _Bool singlefile);
  1191 load_input_tables(
int nprocs, 
int rank, 
char *directory,
  1192                   enum TABLE *tbls, 
int ntbls)
  1194     assert(directory != 0);
  1197         printf(
"reading table files (in advance)...\n");
  1200     MPI_Barrier(MPI_COMM_WORLD);
  1201     double t0 = wtime();
  1203     for (
int i = 0; i < ntbls; i++) {
  1204         struct TABLE_INFO *tbl = find_table(tbls[i]);
  1205         assert(tbl->data.nread != 0);
  1206         load_table_files_in_memory(nprocs, rank, directory, tbl);
  1209     MPI_Barrier(MPI_COMM_WORLD);
  1210     double t1 = wtime();
  1212         printf(
"reading table files (in advance) in %f sec\n", (t1 - t0));
  1218 load_table_files_in_memory(
int nprocs, 
int rank, 
char *directory,
  1224     assert(directory != 0);
  1226     if (tbl->data.nread == 0) {
  1228     } 
else if (tbl->data.nread == 1) {
  1230             cc = snprintf(filename, 
sizeof(filename), 
"%s/%s",
  1231                           directory, tbl->data.file);
  1232             assert(cc < (
int)
sizeof(filename));
  1233             load_one_table_file_in_memory(nprocs, rank, tbl, filename, 1);
  1235     } 
else if (tbl->data.nread == 2) {
  1236         cc = snprintf(filename, 
sizeof(filename), 
"%s/%s",
  1237                       directory, tbl->data.file);
  1238         assert(cc < (
int)
sizeof(filename));
  1239         cc = access(filename, R_OK);
  1243                 load_one_table_file_in_memory(nprocs, rank, tbl, filename, 1);
  1245         } 
else if (errno == ENOENT) {
  1248             for (
int j = 0; j < 50; j++) {
  1250                 int n = (1 + (j * nprocs) + rank);
  1251                 cc = snprintf(filename, 
sizeof(filename), 
"%s/%s.%d",
  1252                               directory, tbl->data.file, n);
  1253                 assert(cc < (
int)
sizeof(filename));
  1254                 cc = access(filename, R_OK);
  1256                     load_one_table_file_in_memory(nprocs, rank, tbl, filename, 0);
  1258                 } 
else if (errno == ENOENT) {
  1261                     perror(
"access tbl file");
  1262                     MPI_Abort(MPI_COMM_WORLD, 1);
  1267             perror(
"access tbl file");
  1268             MPI_Abort(MPI_COMM_WORLD, 1);
  1277 load_one_table_file_in_memory(
int nprocs, 
int rank, 
struct TABLE_INFO *tbl,
  1278                               char *filename, _Bool singlefile)
  1282     int nth = tbl->data.nfiles;
  1283     if (!(nth < tbl->data.nb)) {
  1284         int nb = (singlefile ? 1 : (((nth + 1) + 7) & ~7));
  1286         void **bb = realloc(tbl->data.buffers, (
sizeof(
void *) * (
size_t)nb));
  1288             perror(
"realloc tbl buffer");
  1289             MPI_Abort(MPI_COMM_WORLD, 1);
  1292         for (
int i = 0; i < nb; i++) {
  1293             if (i < tbl->data.nfiles) {
  1299         tbl->data.buffers = bb;
  1301         size_t *ss = realloc(tbl->data.sizes, (
sizeof(
size_t) * (
size_t)nb));
  1303             perror(
"realloc tbl buffer");
  1304             MPI_Abort(MPI_COMM_WORLD, 1);
  1307         for (
int i = 0; i < nb; i++) {
  1308             if (i < tbl->data.nfiles) {
  1314         tbl->data.sizes = ss;
  1319     assert((nth < tbl->data.nb) && (tbl->data.buffers != 0)
  1320            && (tbl->data.buffers[nth] == 0));
  1323         int fd = open(filename, O_RDONLY, 0);
  1326             snprintf(ee, 
sizeof(ee), 
"open(%s) failed", filename);
  1328             MPI_Abort(MPI_COMM_WORLD, 1);
  1336             snprintf(ee, 
sizeof(ee), 
"fstat(%s) failed", filename);
  1338             MPI_Abort(MPI_COMM_WORLD, 1);
  1342         off_t fsz = s.st_size;
  1347         size_t bsz = (size_t)(((fsz + 8) + (1024 - 1)) & (~(1024 - 1)));
  1348         assert(bsz >= (
size_t)fsz);
  1349         char *b = malloc(bsz);
  1352             snprintf(ee, 
sizeof(ee), 
"malloc(%ld) failed", fsz);
  1354             MPI_Abort(MPI_COMM_WORLD, 1);
  1358         double t0 = wtime();
  1360         off_t chunk = (8 * 1024 * 1024);
  1363             size_t rr = (size_t)MIN((fsz - rd), chunk);
  1364             ssize_t cx = read(fd, (b + rd), rr);
  1367                 snprintf(ee, 
sizeof(ee), 
"read(%s) failed", filename);
  1369                 MPI_Abort(MPI_COMM_WORLD, 1);
  1377         double t1 = wtime();
  1379         assert((nth == tbl->data.nfiles) && (tbl->data.buffers[nth] == 0));
  1380         tbl->data.buffers[nth] = b;
  1381         tbl->data.sizes[nth] = (size_t)fsz;
  1384         if (report_time_to_read) {
  1385             fprintf(stderr, 
"[%05d] reading (%s) sz=%ld in %f sec\n",
  1386                     rank, filename, fsz, (t1 - t0));
  1392     long cnt = kvo->c.element_count;
  1393     printf(
"[%d] reading partial table (%s) n=%ld %f sec\n",
  1394            rank, filename, cnt, (t1 - t0));
  1408     KMR *mr = kvo->c.mr;
  1411     assert(tbl->data.nread != 0);
  1413     if (mr->rank == 0) {
  1414         printf(
"scanning table file (%s)...\n", tbl->data.file);
  1418     double t0 = wtime();
  1421     if (run->redistribute_loaded_tables) {
  1431     scan_table_in_memory(kvs0, tbl, &scanner);
  1433     if (run->redistribute_loaded_tables) {
  1434         assert(kvs0 != kvo);
  1438     *(tbl->data.variable) = kvo;
  1440     double t1 = wtime();
  1442     ptime(kvo, 0, 
"scanning table file", tbl->data.file, (t1 - t0));
  1449     KMR *mr = kvo->c.mr;
  1451     for (
int i = 0; i < tbl->data.nfiles; i++) {
  1452         void *b = tbl->data.buffers[i];
  1453         size_t sz = tbl->data.sizes[i];
  1454         struct kmr_option keepopen = {.keep_open = 1};
  1455         kmr_map_getline_in_memory_(mr, b, sz, 0,
  1456                                    kvo, scanner, keepopen, scan_line);
  1462 scan_table_files_in_advance(
KMR *mr, 
enum TABLE *tbls, 
int ntbls,
  1465     if (mr->rank == 0) {
  1466         printf(
"scanning tables (in advance)...\n"); fflush(0);
  1468     double t0 = wtime();
  1470     for (
int i = 0; i < ntbls; i++) {
  1471         struct TABLE_INFO *tbl = find_table(tbls[i]);
  1472         assert(tbl->data.nread != 0);
  1474         scan_table_files(kvs, tbl->name, 0, 0, run);
  1477     double t1 = wtime();
  1478     if (mr->rank == 0) {
  1479         printf(
"scanning tables in %f sec\n", (t1 - t0)); fflush(0);
  1487 dump_line(
KMR *mr, 
char *line, 
int linesz, 
struct kmr_ntuple *u,
  1488           struct COLUMN descs[], 
int ndescs)
  1491     assert(u->n == ndescs);
  1494     char *end = &line[linesz];
  1495     for (
int i = 0; i < ndescs; i++) {
  1497             cc = snprintf(q, (
size_t)(end - q), 
"|");
  1501         switch (descs[i].field) {
  1507             assert(p.len == 
sizeof(
long));
  1508             long v = *(
long *)(p.p);
  1509             cc = snprintf(q, (
size_t)(end - q), 
"%ld", v);
  1511                 fprintf(stderr, 
"Bad integer in %d-th field\n", i);
  1512                 MPI_Abort(MPI_COMM_WORLD, 1);
  1519             assert(p.len == 
sizeof(
double));
  1520             double v = *(
double *)(p.p);
  1521             cc = snprintf(q, (
size_t)(end - q), 
"%lf", v);
  1523                 fprintf(stderr, 
"Bad real in %d-th field\n", i);
  1524                 MPI_Abort(MPI_COMM_WORLD, 1);
  1532             assert((
size_t)(p.len + 1) < (
size_t)(end - q));
  1533             cc = snprintf(q, (
size_t)(p.len + 1), 
"%s", v);
  1535                 fprintf(stderr, 
"Bad text in %d-th field\n", i);
  1536                 MPI_Abort(MPI_COMM_WORLD, 1);
  1540             q += MIN(p.len, cc);
  1544             assert(p.len == 
sizeof(time_t));
  1546             size_t cx = format_date(q, (
size_t)(end - q), *v);
  1548                 fprintf(stderr, 
"Bad date in %d-th field\n", i);
  1549                 MPI_Abort(MPI_COMM_WORLD, 1);
  1564     KMR *mr = kvi->c.mr;
  1565     char line[RECORD_SIZE];
  1566     char buf[RECORD_SIZE];
  1568     struct kmr_ntuple *u = (
void *)kv0.v.p;
  1569     struct RECORD *d = find_description((
enum TABLE)u->marker);
  1571     if (d->nkeys == 1) {
  1572         struct kmr_ntuple *k = (
void *)buf;
  1575         dump_line(kvi->c.mr, line, (
int)
sizeof(line), k,
  1577         printf(
"%s:", line);
  1579         struct kmr_ntuple *k = (
void *)kv0.k.p;
  1580         dump_line(kvi->c.mr, line, (
int)
sizeof(line), k, d->keys, d->nkeys);
  1581         printf(
"%s:", line);
  1584     dump_line(mr, line, (
int)
sizeof(line), u, d->columns, d->ncolumns);
  1585     printf(
"%s\n", line);
  1594     KMR *mr = kvi->c.mr;
  1595     char line[RECORD_SIZE];
  1597     struct kmr_ntuple *u = (
void *)kv0.v.p;
  1598     struct RECORD *d = find_description((
enum TABLE)u->marker);
  1600     dump_line(mr, line, (
int)
sizeof(line), u, d->columns, d->ncolumns);
  1602     printf(
"%s\n", line);
  1607 dump_table(
KMR_KVS *kvs, 
enum TABLE table)
  1609     struct RECORD *d = find_description(table);
  1610     struct kmr_option inspect = {.nothreading = 1, .inspect = 1};
  1614 #define CREATE_KVS(MR, KEY, PUSHOFF) \  1615     create_kvs1((MR), (KEY), KMR_KV_OPAQUE, \  1616                 (PUSHOFF), __FILE__, __LINE__, __func__)  1621             const char *file, 
const int line, 
const char *func)
  1626                                      kmr_noopt, file, line, func);
  1629                               kmr_noopt, file, line, func);
  1641     struct kmr_ntuple *u = (
void *)kv0.v.p;
  1656 select_by_fields(
const struct kmr_kv_box kv0,
  1658                  void *p, 
const long i)
  1660     char kbuf[RECORD_SIZE];
  1661     struct kmr_ntuple *k = (
void *)kbuf;
  1662     char vbuf[RECORD_SIZE];
  1663     struct kmr_ntuple *v = (
void *)vbuf;
  1665     struct SELECT *selector = p;
  1666     KMR *mr = kvo->c.mr;
  1667     struct kmr_ntuple *u = (
void *)kv0.v.p;
  1668     assert(u->marker == (
int)selector->input);
  1672     if (selector->nkeys == 0) {
  1675     } 
else if (selector->nkeys == 1) {
  1681         put_columns_by_indexes(mr, k, 
sizeof(kbuf), u,
  1682                                selector->keys, selector->nkeys);
  1688     put_columns_by_indexes(mr, v, 
sizeof(vbuf), u,
  1689                            selector->columns, selector->ncolumns);
  1705 join_by_fields(
const struct kmr_kv_box kv[], 
const long n,
  1709     struct kmr_ntuple **vv[2];
  1713                         producer->columns, producer->ncolumns,
  1714                         producer->keys, producer->nkeys);
  1716     if (producer->trace_product) {
  1717         fprintf(stderr, 
"prod %ld %ld\n", cnt[0], cnt[1]); fflush(0);
  1719     if (producer->trace_product_nonempty && cnt[0] != 0 && cnt[1] != 0) {
  1720         fprintf(stderr, 
"prod %ld %ld\n", cnt[0], cnt[1]); fflush(0);
  1722     assert((!producer->cnt0_zero || cnt[0] == 0)
  1723            && (!producer->cnt0_one || cnt[0] == 1)
  1724            && (!producer->cnt0_zero_one || (cnt[0] == 0 || cnt[0] == 1))
  1725            && (!producer->cnt0_nonzero || cnt[0] != 0));
  1726     assert((!producer->cnt1_zero || cnt[1] == 0)
  1727            && (!producer->cnt1_one || cnt[1] == 1)
  1728            && (!producer->cnt1_zero_one || (cnt[1] == 0 || cnt[1] == 1))
  1729            && (!producer->cnt1_nonzero || cnt[1] != 0));
  1738       kmr_redfn_t join, 
void *arg, 
char *msg, _Bool pushoff)
  1740     KMR *mr = input0->c.mr;
  1744     ptime(input0, 0, 0, 0, 0.0);
  1745     double t0 = wtime();
  1746     KMR_KVS *kvs1 = CREATE_KVS(mr, input0->c.key_data, 0);
  1748     double t1 = wtime();
  1749     ptime(kvs1, 0, 
"shuffle", msg, (t1 - t0));
  1750     pcount(kvs1, 0, msg, 0);
  1751     KMR_KVS *kvs2 = CREATE_KVS(mr, outputkf, pushoff);
  1752     kmr_reduce(kvs1, kvs2, arg, kmr_noopt, join);
  1753     pcount(kvs2, 0, msg, 1);
  1759       kmr_redfn_t join, 
void *arg, 
char *m, _Bool pushoff)
  1761     KMR *mr = input0->c.mr;
  1763     snprintf(msg, 
sizeof(msg), 
"join (%s)", m);
  1765     ptime(input0, input1, 0, 0, 0.0);
  1766     double t0 = wtime();
  1767     assert(input0->c.key_data == input1->c.key_data);
  1769     KMR_KVS *kvs0 = CREATE_KVS(mr, inputkf, 0);
  1771     KMR_KVS *kvs1 = CREATE_KVS(mr, inputkf, 0);
  1773     double t1 = wtime();
  1774     ptime(kvs0, kvs1, 
"shuffle", m, (t1 - t0));
  1775     KMR_KVS *vec[] = {kvs0, kvs1};
  1776     KMR_KVS *kvs2 = CREATE_KVS(mr, inputkf, 0);
  1778     pcount(kvs2, 0, msg, 0);
  1779     KMR_KVS *kvs3 = CREATE_KVS(mr, outputkf, pushoff);
  1780     kmr_reduce(kvs2, kvs3, arg, kmr_noopt, join);
  1781     pcount(kvs3, 0, msg, 1);
  1787       struct PRODUCT *join, 
char *m, _Bool pushoff)
  1789     KMR *mr = input0->c.mr;
  1791     snprintf(msg, 
sizeof(msg), 
"join (%s)", m);
  1793     ptime(input0, input1, 0, 0, 0.0);
  1794     double t0 = wtime();
  1795     assert(input0->c.key_data == input1->c.key_data);
  1797     KMR_KVS *kvs0 = CREATE_KVS(mr, inputkf, 0);
  1799     KMR_KVS *kvs1 = CREATE_KVS(mr, inputkf, 0);
  1801     double t1 = wtime();
  1802     ptime(kvs0, kvs1, 
"shuffle", m, (t1 - t0));
  1803     KMR_KVS *vec[] = {kvs0, kvs1};
  1804     KMR_KVS *kvs2 = CREATE_KVS(mr, inputkf, 0);
  1806     pcount(kvs2, 0, msg, 0);
  1807     KMR_KVS *kvs3 = CREATE_KVS(mr, outputkf, pushoff);
  1808     kmr_reduce(kvs2, kvs3, join, kmr_noopt, join_by_fields);
  1809     pcount(kvs3, 0, msg, 1);
  1893 static struct SELECT q7_select_s = {
  1895     .output = TAB_Q7_S1,
  1903 static struct SELECT q7_select_c = {
  1905     .output = TAB_Q7_C1,
  1913 static struct SELECT q7_select_o = {
  1915     .output = TAB_Q7_O1,
  1923 static struct PRODUCT q7_join_nn_s = {
  1924     .inputs = {TAB_Q7_S1, TAB_Q7_NN},
  1925     .output = TAB_Q7_NNS,
  1928     .columns = {{2, SND}, {1, FST}, {1, SND}, {3, SND}},
  1930     .keys = {{2, SND}, {1, FST}},
  1934 static struct PRODUCT q7_join_c_o = {
  1935     .inputs = {TAB_Q7_C1, TAB_Q7_O1},
  1936     .output = TAB_Q7_CO,
  1939     .columns = {{1, SND}, {1, FST}},
  1945 static struct PRODUCT q7_join_l_co = {
  1946     .inputs = {TAB_Q7_L1, TAB_Q7_CO},
  1947     .output = TAB_Q7_CLO,
  1950     .columns = {{1, SND}, {1, FST}, {2, FST}, {3, FST}},
  1952     .keys = {{1, SND}, {1, FST}},
  1954     .trace_product_nonempty = 0
  1957 struct PRODUCT q7_join_clo_nns = {
  1958     .inputs = {TAB_Q7_CLO, TAB_Q7_NNS},
  1959     .output = TAB_Q7_CLNNOS0,
  1962     .columns = {{2, SND}, {3, SND}, {2, FST}, {3, FST}},
  1967     .trace_product_nonempty = 0
  1972                   KMR_KVS *kvo, 
void *p, 
const long i)
  1974     char kbuf[RECORD_SIZE];
  1975     struct kmr_ntuple *k = (
void *)kbuf;
  1976     char vbuf[RECORD_SIZE];
  1977     struct kmr_ntuple *v = (
void *)vbuf;
  1979     enum TABLE input = TAB_N;
  1980     enum TABLE output = TAB_Q7_N1;
  1982     struct RECORD *d = find_description(input);
  1984     KMR *mr = kvo->c.mr;
  1985     struct kmr_ntuple *u = (
void *)kv0.v.p;
  1988     char *s0 = 
"FRANCE";
  1989     char *s1 = 
"GERMANY";
  1990     if (strncmp((
char *)name.p, s0, strlen(s0)) == 0
  1991         || strncmp((
char *)name.p, s1, strlen(s1)) == 0) {
  1992         char *cols[] = {
"n_nationkey", 
"n_name"};
  1996         put_columns_by_names(mr, v, 
sizeof(vbuf), u, d, cols, 2);
  1997         add_record(kvo, k, v);
  2004 q7_pair_names(
const struct kmr_kv_box kv[], 
const long n,
  2007     char kbuf[RECORD_SIZE];
  2008     struct kmr_ntuple *k = (
void *)kbuf;
  2009     char vbuf[RECORD_SIZE];
  2010     struct kmr_ntuple *v = (
void *)vbuf;
  2012     enum TABLE input = TAB_N;
  2013     enum TABLE output = TAB_Q7_NN;
  2015     struct RECORD *d = find_description(input);
  2016     char *cols[] = {
"n_nationkey", 
"n_name"};
  2018     KMR *mr = kvo->c.mr;
  2020     struct kmr_ntuple *u0 = (
void *)kv[0].v.p;
  2021     struct kmr_ntuple *u1 = (
void *)kv[1].v.p;
  2029         put_columns_by_names(mr, v, 
sizeof(vbuf), u0, d, cols, 2);
  2030         put_columns_by_names(mr, v, 
sizeof(vbuf), u1, d, cols, 2);
  2031         add_record(kvo, k, v);
  2040         put_columns_by_names(mr, v, 
sizeof(vbuf), u1, d, cols, 2);
  2041         put_columns_by_names(mr, v, 
sizeof(vbuf), u0, d, cols, 2);
  2042         add_record(kvo, k, v);
  2051 q7_select_by_date(
const struct kmr_kv_box kv0,
  2053                   void *p, 
const long i)
  2055     char kbuf[RECORD_SIZE];
  2056     struct kmr_ntuple *k = (
void *)kbuf;
  2057     char vbuf[RECORD_SIZE];
  2058     struct kmr_ntuple *v = (
void *)vbuf;
  2060     enum TABLE input = TAB_L;
  2061     enum TABLE output = TAB_Q7_L1;
  2063     struct RECORD *d = find_description(input);
  2064     struct RECORD *dx = find_description(output);
  2067     KMR *mr = kvo->c.mr;
  2069     struct kmr_ntuple *u = (
void *)kv0.v.p;
  2070     assert(u->marker == (
int)input);
  2072     time_t shipdate = get_date_column(u, d, 
"l_shipdate");
  2073     if (dt[0] <= shipdate && shipdate <= dt[1]) {
  2077         time_t year = year_value(shipdate);
  2079         double extendedprice = get_real_column(u, d, 
"l_extendedprice");
  2080         double discount = get_real_column(u, d, 
"l_discount");
  2081         double volume = (extendedprice * (1 - discount));
  2089         kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &volume, 
sizeof(volume));
  2090         add_record(kvo, k, v);
  2097 q7_make_sort_keys(
const struct kmr_kv_box kv0,
  2099                   void *p, 
const long i)
  2101     char kbuf[RECORD_SIZE];
  2102     struct kmr_ntuple *k = (
void *)kbuf;
  2104     enum TABLE input = TAB_Q7_CLNNOS0;
  2106     struct RECORD *d = find_description(input);
  2108     KMR *mr = kvo->c.mr;
  2109     struct kmr_ntuple *u = (
void *)kv0.v.p;
  2110     assert(u->marker == (
int)input);
  2114     time_t year = get_date_column_by_index(u, d->columns, 2);
  2116     char nbuf[NAME_SIZE];
  2117     assert((name1.len + name2.len) <= NAME_SIZE);
  2118     memset(nbuf, 0, 
sizeof(nbuf));
  2119     memcpy(&nbuf[0], name1.p, name1.len);
  2120     memcpy(&nbuf[name1.len], name2.p, name2.len);
  2121     uint64_t beyear = htonll_((uint64_t)(year));
  2125     kmr_put_ntuple(mr, k, (
int)
sizeof(kbuf), &beyear, 
sizeof(beyear));
  2126     add_record(kvo, k, (
void *)kv0.v.p);
  2132 q7_sum_volume(
const struct kmr_kv_box kv[], 
const long n,
  2135     char vbuf[RECORD_SIZE];
  2136     struct kmr_ntuple *v = (
void *)vbuf;
  2138     enum TABLE input = TAB_Q7_CLNNOS1;
  2139     enum TABLE output = TAB_Q7_REVENUE;
  2140     struct RECORD *d = find_description(input);
  2142     KMR *mr = kvo->c.mr;
  2143     struct kmr_ntuple *u0 = (
void *)kv[0].v.p;
  2144     assert(u0->marker == TAB_Q7_CLNNOS0);
  2148     for (
long i = 0; i < n; i++) {
  2149         struct kmr_ntuple *u = (
void *)kv[i].v.p;
  2150         double volume = get_real_column_by_index(u, d->columns, 3);
  2154     int cols[] = {0, 1, 2};
  2156     assert(d->nkeys >= 2);
  2158     put_columns_by_indexes(mr, v, 
sizeof(vbuf), u0, cols, 3);
  2159     kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &revenue, 
sizeof(revenue));
  2160     add_record(kvo, (
void *)kv[0].k.p, v);
  2166 q7(
KMR *mr, 
struct RUN *run)
  2168     _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
  2169     struct kmr_option rankzero = {.rank_zero = 1};
  2172         if (mr->rank == 0) {printf(
"q7 (with push-off)...\n"); fflush(0);}
  2174         if (mr->rank == 0) {printf(
"q7...\n"); fflush(0);}
  2179     if (mr->rank == 0) {printf(
"q7 (n+n)...\n"); fflush(0);}
  2181     if (run->load_tables_in_advance) {
  2185         scan_table_files(n_, TAB_N, 0, 0, run);
  2188     KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
  2189     kmr_map(N0, n1, 0, kmr_noopt, q7_select_nations);
  2190     KMR_KVS *n2 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
  2192     pcount(n2, 0, 
"join (n+n)", 0);
  2193     KMR_KVS *nn = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
  2194     kmr_reduce(n2, nn, 0, kmr_noopt, q7_pair_names);
  2195     pcount(nn, 0, 
"join (n+n)", 1);
  2197     if (mr->rank == 0) {printf(
"q7 (s)...\n"); fflush(0);}
  2199     KMR_KVS *s1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2200     if (run->load_tables_in_advance) {
  2201         kmr_map(S0, s1, &q7_select_s, kmr_noopt, select_by_fields);
  2203         scan_table_files(s1, TAB_S, select_by_fields, &q7_select_s, run);
  2206     if (mr->rank == 0) {printf(
"q7 (nn+s)...\n"); fflush(0);}
  2208     KMR_KVS *nns = JOINP(s1, nn, KMR_KV_OPAQUE,
  2209                          &q7_join_nn_s, 
"nn+s", pushoff);
  2213     if (mr->rank == 0) {printf(
"q7 (c)...\n"); fflush(0);}
  2215     KMR_KVS *c1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2216     if (run->load_tables_in_advance) {
  2217         kmr_map(C0, c1, &q7_select_c, kmr_noopt, select_by_fields);
  2219         scan_table_files(c1, TAB_C, select_by_fields, &q7_select_c, run);
  2222     if (mr->rank == 0) {printf(
"q7 (o)...\n"); fflush(0);}
  2224     KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2225     if (run->load_tables_in_advance) {
  2226         kmr_map(O0, o1, &q7_select_o, kmr_noopt, select_by_fields);
  2228         scan_table_files(o1, TAB_O, select_by_fields, &q7_select_o, run);
  2231     if (mr->rank == 0) {printf(
"q7 (c+o)...\n"); fflush(0);}
  2233     KMR_KVS *co = JOINP(c1, o1, KMR_KV_OPAQUE,
  2234                         &q7_join_c_o, 
"c+o", pushoff);
  2236     if (mr->rank == 0) {printf(
"q7 (l)...\n"); fflush(0);}
  2239     tv[0] = decode_date(
"1995-01-01");
  2240     tv[1] = decode_date(
"1996-12-31");
  2241     assert((tv[0] != (time_t)-1) && (tv[1] != (time_t)-1));
  2242     KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2243     if (run->load_tables_in_advance) {
  2244         kmr_map(L0, l1, tv, kmr_noopt, q7_select_by_date);
  2246         scan_table_files(l1, TAB_L, q7_select_by_date, tv, run);
  2251     if (mr->rank == 0) {printf(
"q7 (co+l)...\n"); fflush(0);}
  2253     KMR_KVS *clo = JOINP(l1, co, KMR_KV_OPAQUE,
  2254                          &q7_join_l_co, 
"co+l", pushoff);
  2258     if (mr->rank == 0) {printf(
"q7 (clo+nns)...\n"); fflush(0);}
  2260     KMR_KVS *clnnos2 = JOINP(clo, nns, KMR_KV_OPAQUE,
  2261                              &q7_join_clo_nns, 
"clo+nns", 0);
  2262     KMR_KVS *clnnos3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2263     kmr_map(clnnos2, clnnos3, 0, kmr_noopt, q7_make_sort_keys);
  2265     KMR_KVS *revenue0 = JOIN1(clnnos3, KMR_KV_OPAQUE,
  2266                               q7_sum_volume, 0, 
"sum(volume)", 0);
  2267     KMR_KVS *revenue1 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
  2268     kmr_sort(revenue0, revenue1, kmr_noopt);
  2329 static struct PRODUCT q9_join_p_h = {
  2330     .inputs = {TAB_P, TAB_H},
  2331     .output = TAB_Q9_PH,
  2334     .columns = {{0, SND}, {1, SND}, {3, SND}},
  2342 static struct PRODUCT q9_join_s_n = {
  2343     .inputs = {TAB_S, TAB_N},
  2344     .output = TAB_Q9_NS,
  2347     .columns = {{1, SND}, {0, FST}},
  2354 static struct PRODUCT q9_join_hp_ns = {
  2355     .inputs = {TAB_Q9_PH, TAB_Q9_NS},
  2356     .output = TAB_Q9_PHSN,
  2359     .columns = {{0, SND}, {0, FST}, {1, FST}, {2, FST}},
  2361     .keys = {{0, FST}, {1, FST}},
  2366 static struct PRODUCT q9_join_l_o = {
  2367     .inputs = {TAB_L, TAB_O},
  2368     .output = TAB_Q9_LO,
  2372     .columns = {{6, FST}, {5, FST}, {1, FST}, {4, FST}, {2, FST}, {4, SND}},
  2374     .keys = {{1, FST}, {2, FST}},
  2379 static struct PRODUCT q9_join_hnps_lo = {
  2380     .inputs = {TAB_Q9_PHSN, TAB_Q9_LO},
  2381     .output = TAB_Q9_PHSNLO,
  2385     .columns = {{0, SND}, {1, SND}, {3, SND}, {0, FST}, {5, SND}, {3, FST}},
  2394 q9_select_by_name(
const struct kmr_kv_box kv0,
  2396                   void *p, 
const long i)
  2398     enum TABLE input = TAB_P;
  2399     struct RECORD *d0 = find_description(input);
  2400     assert(d0->columns[1].field == F_TEXT);
  2402     struct kmr_ntuple *u = (
void *)kv0.v.p;
  2404     char *pos = strnstr_(e.p, 
"green", (
size_t)e.len);
  2412 q9_calculate_amount(
const struct kmr_kv_box kv0,
  2414                     void *p, 
const long i)
  2416     char kbuf[RECORD_SIZE];
  2417     struct kmr_ntuple *k = (
void *)kbuf;
  2418     char vbuf[RECORD_SIZE];
  2419     struct kmr_ntuple *v = (
void *)vbuf;
  2421     enum TABLE inputoutput = TAB_Q9_PHSNLO;
  2422     struct RECORD *d = find_description(inputoutput);
  2424     KMR *mr = kvo->c.mr;
  2425     struct kmr_ntuple *u = (
void *)kv0.v.p;
  2426     double l_discount = get_real_column_by_index(u, d->columns, 0);
  2427     double l_extendedprice = get_real_column_by_index(u, d->columns, 1);
  2428     double l_quantity = get_real_column_by_index(u, d->columns, 2);
  2430     time_t o_orderdate = get_date_column_by_index(u, d->columns, 4);
  2431     double l_supplycost = get_real_column_by_index(u, d->columns, 5);
  2433     time_t year = year_value(o_orderdate);
  2435     double amount = ((l_extendedprice * (1 - l_discount))
  2436                      - (l_supplycost * l_quantity));
  2439         char nbuf[NAME_SIZE];
  2440         assert(n_name.len <= NAME_SIZE);
  2441         memset(nbuf, 0, 
sizeof(nbuf));
  2442         memcpy(nbuf, n_name.p, n_name.len);
  2443         uint64_t beyear = htonll_((uint64_t)(-year));
  2447         kmr_put_ntuple(mr, k, (
int)
sizeof(kbuf), &beyear, 
sizeof(beyear));
  2454         kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &amount, 
sizeof(amount));
  2456     add_record(kvo, k, v);
  2462 q9_sum_amount(
const struct kmr_kv_box kv[], 
const long n,
  2465     char vbuf[RECORD_SIZE];
  2466     struct kmr_ntuple *v = (
void *)vbuf;
  2468     enum TABLE inputoutput = TAB_Q9_AMOUNT;
  2469     struct RECORD *d = find_description(inputoutput);
  2471     KMR *mr = kvo->c.mr;
  2475     for (
long i = 0; i < n; i++) {
  2476         struct kmr_ntuple *u = (
void *)kv[i].v.p;
  2477         double amount = get_real_column_by_index(u, d->columns, 2);
  2482         struct kmr_ntuple *u = (
void *)kv[0].v.p;
  2504 q9(
KMR *mr, 
struct RUN *run)
  2506     _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
  2510         if (mr->rank == 0) {printf(
"q9 (with push-off)...\n"); fflush(0);}
  2512         if (mr->rank == 0) {printf(
"q9...\n"); fflush(0);}
  2517     if (mr->rank == 0) {printf(
"q9 (p+ps)...\n"); fflush(0);}
  2519     if (run->load_tables_in_advance) {
  2522         KMR_KVS *h0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2523         scan_table_files(h0, TAB_H, 0, 0, run);
  2526     KMR_KVS *p1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2527     if (run->load_tables_in_advance) {
  2528         kmr_map(P0, p1, 0, kmr_noopt, q9_select_by_name);
  2530         scan_table_files(p1, TAB_P, q9_select_by_name, 0, run);
  2533     KMR_KVS *pps2 = JOINP(p1, H0, KMR_KV_OPAQUE,
  2534                           &q9_join_p_h, 
"p+ps", pushoff);
  2538     if (mr->rank == 0) {printf(
"q9 (s+n)...\n"); fflush(0);}
  2540     if (run->load_tables_in_advance) {
  2543         KMR_KVS *n0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2544         scan_table_files(n0, TAB_N, 0, 0, run);
  2547     KMR_KVS *s0x = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2549     if (run->load_tables_in_advance) {
  2550         kmr_map(S0, s0x, &nth, kmr_noopt, key_by_nth);
  2552         scan_table_files(s0x, TAB_S, key_by_nth, &nth, run);
  2555     KMR_KVS *sn2 = JOINP(s0x, N0, KMR_KV_OPAQUE,
  2556                          &q9_join_s_n, 
"n+s", pushoff);
  2560     if (mr->rank == 0) {printf(
"q9 (p+ps)+(s+n)...\n"); fflush(0);}
  2562     KMR_KVS *ppssn2 = JOINP(pps2, sn2, KMR_KV_OPAQUE,
  2563                             &q9_join_hp_ns, 
"ns+pps", pushoff);
  2567     if (mr->rank == 0) {printf(
"q9 (l+o)...\n"); fflush(0);}
  2569     KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2571     if (run->load_tables_in_advance) {
  2572         kmr_map(L0, l1, &nth, kmr_noopt, key_by_nth);
  2574         scan_table_files(l1, TAB_L, key_by_nth, &nth, run);
  2577     KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2579     if (run->load_tables_in_advance) {
  2580         kmr_map(O0, o1, &nth, kmr_noopt, key_by_nth);
  2582         scan_table_files(o1, TAB_O, key_by_nth, &nth, run);
  2585     KMR_KVS *lo2 = JOINP(l1, o1, KMR_KV_OPAQUE,
  2586                          &q9_join_l_o, 
"l+o", pushoff);
  2590     if (mr->rank == 0) {printf(
"q9 (((p+ps)+(s+n))+(l+o))...\n"); fflush(0);}
  2592     KMR_KVS *ppssnlo2 = JOINP(ppssn2, lo2, KMR_KV_OPAQUE,
  2593                               &q9_join_hnps_lo, 
"lo+nppss", 0);
  2594     KMR_KVS *ppssnlo3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2595     kmr_map(ppssnlo2, ppssnlo3, 0, kmr_noopt, q9_calculate_amount);
  2597     pcount(ppssnlo3, 0, 
"q9_calculate_amount", 1);
  2599     KMR_KVS *ppssnlo5 = JOIN1(ppssnlo3, KMR_KV_OPAQUE,
  2600                               q9_sum_amount, 0, 
"sum(amount)", 0);
  2602     KMR_KVS *ppssnlo6 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
  2603     kmr_sort(ppssnlo5, ppssnlo6, kmr_noopt);
  2676 static struct SELECT q10_select_c = {
  2678     .output = TAB_Q10_C1,
  2682     .columns = {3, 0, 1, 5, 4, 2, 7},
  2687 static struct SELECT q10_select_n = {
  2689     .output = TAB_Q10_N1,
  2697 static struct PRODUCT q10_join_l_o = {
  2698     .inputs = {TAB_Q10_L1, TAB_Q10_O1},
  2699     .output = TAB_Q10_LO,
  2702     .columns = {{1, SND}, {1, FST}},
  2708 static struct PRODUCT q10_join_c_n = {
  2709     .inputs = {TAB_Q10_C1, TAB_Q10_N1},
  2710     .output = TAB_Q10_CN,
  2714     .columns = {{1, FST}, {2, FST}, {3, FST}, {4, FST},
  2715                 {1, SND}, {5, FST}, {6, FST}},
  2721 static struct PRODUCT q10_join_cn_lo = {
  2722     .inputs = {TAB_Q10_CN, TAB_Q10_LO},
  2723     .output = TAB_Q10_CLNO0,
  2727     .columns = {{0, FST}, {1, FST}, {2, FST}, {3, FST},
  2728                 {4, FST}, {5, FST}, {6, FST}, {1, SND}},
  2730     .keys = {{0, FST}, {1, FST}, {2, FST}, {3, FST},
  2731              {4, FST}, {5, FST}, {6, FST}},
  2737                    KMR_KVS *kvo, 
void *p, 
const long i)
  2739     char kbuf[RECORD_SIZE];
  2740     struct kmr_ntuple *k = (
void *)kbuf;
  2741     char vbuf[RECORD_SIZE];
  2742     struct kmr_ntuple *v = (
void *)vbuf;
  2744     enum TABLE input = TAB_L;
  2745     enum TABLE output = TAB_Q10_L1;
  2746     struct RECORD *d = find_description(input);
  2748     KMR *mr = kvo->c.mr;
  2749     struct kmr_ntuple *u = (
void *)kv0.v.p;
  2752     if (strncmp((
char *)returnflag.p, s0, 1) == 0) {
  2753         double extendedprice = get_real_column(u, d, 
"l_extendedprice");
  2754         double discount = get_real_column(u, d, 
"l_discount");
  2755         double volume = (extendedprice * (1 - discount));
  2762         kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &volume, 
sizeof(volume));
  2763         add_record(kvo, k, v);
  2771                    KMR_KVS *kvo, 
void *p, 
const long i)
  2773     char kbuf[RECORD_SIZE];
  2774     struct kmr_ntuple *k = (
void *)kbuf;
  2775     char vbuf[RECORD_SIZE];
  2776     struct kmr_ntuple *v = (
void *)vbuf;
  2778     enum TABLE input = TAB_O;
  2779     enum TABLE output = TAB_Q10_O1;
  2780     struct RECORD *d = find_description(input);
  2783     KMR *mr = kvo->c.mr;
  2784     struct kmr_ntuple *u = (
void *)kv0.v.p;
  2785     assert(u->marker == (
int)input);
  2787     time_t orderdate = get_date_column(u, d, 
"o_orderdate");
  2788     if (tv[0] <= orderdate && orderdate <= tv[1]) {
  2791         char *cols[] = {
"o_orderkey", 
"o_custkey"};
  2796         put_columns_by_names(mr, v, 
sizeof(vbuf), u, d, cols, 2);
  2797         add_record(kvo, k, v);
  2803 q10_sum_volume(
const struct kmr_kv_box kv[], 
const long n,
  2806     char kbuf[RECORD_SIZE];
  2807     struct kmr_ntuple *k = (
void *)kbuf;
  2808     char vbuf[RECORD_SIZE];
  2809     struct kmr_ntuple *v = (
void *)vbuf;
  2811     enum TABLE input = TAB_Q10_CLNO0;
  2812     enum TABLE output = TAB_Q10_CLNO1;
  2813     struct RECORD *d = find_description(input);
  2815     KMR *mr = kvo->c.mr;
  2816     struct kmr_ntuple *u0 = (
void *)kv[0].v.p;
  2817     assert(u0->marker == (
int)input);
  2821     for (
long i = 0; i < n; i++) {
  2822         struct kmr_ntuple *u = (
void *)kv[i].v.p;
  2823         double volume = get_real_column_by_index(u, d->columns, 7);
  2827     char *cols0[] = {
"c_custkey", 
"c_name"};
  2828     char *cols1[] = {
"c_acctbal", 
"n_name", 
"c_address",
  2829                      "c_phone", 
"c_comment"};
  2831     double negrevenue = -revenue;
  2834     kmr_put_ntuple(mr, k, (
int)
sizeof(kbuf), &negrevenue, 
sizeof(
double));
  2836     put_columns_by_names(mr, v, 
sizeof(vbuf), u0, d, cols0, 2);
  2837     kmr_put_ntuple(mr, v, (
int)
sizeof(vbuf), &revenue, 
sizeof(revenue));
  2838     put_columns_by_names(mr, v, 
sizeof(vbuf), u0, d, cols1, 5);
  2839     add_record(kvo, k, v);
  2845 q10(
KMR *mr, 
struct RUN *run)
  2847     _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
  2850         if (mr->rank == 0) {printf(
"q10 (with push-off)...\n"); fflush(0);}
  2852         if (mr->rank == 0) {printf(
"q10...\n"); fflush(0);}
  2857     if (mr->rank == 0) {printf(
"q10 (l)...\n"); fflush(0);}
  2859     KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2860     if (run->load_tables_in_advance) {
  2861         kmr_map(L0, l1, 0, kmr_noopt, q10_select_by_flag);
  2863         scan_table_files(l1, TAB_L, q10_select_by_flag, 0, run);
  2868     if (mr->rank == 0) {printf(
"q10 (o)...\n"); fflush(0);}
  2871     tv[0] = decode_date(
"1993-10-01");
  2872     assert(tv[0] != (time_t)-1);
  2873     tv[1] = decode_date(
"1994-01-01");
  2874     assert(tv[1] != (time_t)-1);
  2875     KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2876     if (run->load_tables_in_advance) {
  2877         kmr_map(O0, o1, tv, kmr_noopt, q10_select_by_date);
  2879         scan_table_files(o1, TAB_O, q10_select_by_date, tv, run);
  2884     KMR_KVS *lo = JOINP(l1, o1, KMR_KV_OPAQUE,
  2885                         &q10_join_l_o, 
"l+o", pushoff);
  2889     if (mr->rank == 0) {printf(
"q10 (c)...\n"); fflush(0);}
  2891     KMR_KVS *c1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2892     if (run->load_tables_in_advance) {
  2893         kmr_map(C0, c1, &q10_select_c, kmr_noopt, select_by_fields);
  2895         scan_table_files(c1, TAB_C, select_by_fields, &q10_select_c, run);
  2898     if (mr->rank == 0) {printf(
"q10 (n)...\n"); fflush(0);}
  2900     KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  2901     if (run->load_tables_in_advance) {
  2902         kmr_map(N0, n1, &q10_select_n, kmr_noopt, select_by_fields);
  2904         scan_table_files(n1, TAB_N, select_by_fields, &q10_select_n, run);
  2907     if (mr->rank == 0) {printf(
"q10 (c+n)...\n"); fflush(0);}
  2909     KMR_KVS *cn = JOINP(c1, n1, KMR_KV_OPAQUE,
  2910                         &q10_join_c_n, 
"c+n", pushoff);
  2914     if (mr->rank == 0) {printf(
"q10 (cn+lo)...\n"); fflush(0);}
  2916     KMR_KVS *clno0 = JOINP(cn, lo, KMR_KV_OPAQUE,
  2917                            &q10_join_cn_lo, 
"cn+lo", pushoff);
  2919     if (mr->rank == 0) {printf(
"q10 (sum(volume))...\n"); fflush(0);}
  2921     KMR_KVS *clno1 = JOIN1(clno0, KMR_KV_FLOAT8,
  2922                            q10_sum_volume, 0, 
"sum(volume)", 0);
  2926     KMR_KVS *clno2 = CREATE_KVS(mr, KMR_KV_FLOAT8, 0);
  2929     pcount(clno2, 0, 
"sort", 1);
  2931     KMR_KVS *clno3 = CREATE_KVS(mr, KMR_KV_FLOAT8, 0);
  2934     pcount(clno3, 0, 
"choose", 1);
  2981 q13_select_by_string(
const struct kmr_kv_box kv0,
  2983                      void *p, 
const long i)
  2985     char kbuf[RECORD_SIZE];
  2986     struct kmr_ntuple *k = (
void *)kbuf;
  2987     char vbuf[RECORD_SIZE];
  2988     struct kmr_ntuple *v = (
void *)vbuf;
  2990     enum TABLE input = TAB_O;
  2991     enum TABLE output = TAB_Q13_O1;
  2992     struct RECORD *d = find_description(input);
  2994     KMR *mr = kvo->c.mr;
  2995     struct kmr_ntuple *u = (
void *)kv0.v.p;
  2997     assert(d->columns[8].field == F_TEXT);
  2999     char *p0 = (comment.p);
  3000     char *end = &(p0[comment.len]);
  3001     char *p1 = strnstr_(p0, 
"special", (
size_t)(end - p0));
  3002     char *p2 = ((p1 == 0) ? 0 : strnstr_(p1, 
"requests", (
size_t)(end - p1)));
  3004     if (!(p1 != 0 && p2 != 0)) {
  3012         add_record(kvo, k, v);
  3018 q13_join_c_o(
const struct kmr_kv_box kv[], 
const long n,
  3021     char kbuf[RECORD_SIZE];
  3022     struct kmr_ntuple *k = (
void *)kbuf;
  3023     char vbuf[RECORD_SIZE];
  3024     struct kmr_ntuple *v = (
void *)vbuf;
  3026     int inputs[2] = {TAB_C, TAB_Q13_O1};
  3027     int output = TAB_Q13_CO0;
  3030     KMR *mr = kvo->c.mr;
  3032     struct kmr_ntuple **vv[2];
  3036     assert(cnt[0] == 1);
  3038     long count = cnt[1];
  3040     struct kmr_ntuple *u0 = vv[0][0];
  3042     assert(custkey.len == 
sizeof(
long));
  3048     add_record(kvo, k, v);
  3056 q13_join_co(
const struct kmr_kv_box kv[], 
const long n,
  3059     char kbuf[RECORD_SIZE];
  3060     struct kmr_ntuple *k = (
void *)kbuf;
  3061     char vbuf[RECORD_SIZE];
  3062     struct kmr_ntuple *v = (
void *)vbuf;
  3065     enum TABLE output = TAB_Q13_CO1;
  3067     KMR *mr = kvo->c.mr;
  3068     assert(kv[0].klen == 
sizeof(
long));
  3070     long count = kv[0].k.i;
  3074     assert(
sizeof(uint64_t) == 
sizeof(
long));
  3075     uint64_t becustdist = htonll_((uint64_t)(-custdist));
  3076     uint64_t becount = htonll_((uint64_t)(-count));
  3084     add_record(kvo, k, v);
  3090 q13(
KMR *mr, 
struct RUN *run)
  3092     _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
  3095         if (mr->rank == 0) {printf(
"q13 (with push-off)...\n"); fflush(0);}
  3097         if (mr->rank == 0) {printf(
"q13...\n"); fflush(0);}
  3102     if (mr->rank == 0) {printf(
"q13 (o)...\n"); fflush(0);}
  3104     KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  3105     if (run->load_tables_in_advance) {
  3106         kmr_map(O0, o1, 0, kmr_noopt, q13_select_by_string);
  3108         scan_table_files(o1, TAB_O, q13_select_by_string, 0, run);
  3111     pcount(o1, 0, 
"select (o)", 1);
  3115     if (run->load_tables_in_advance) {
  3118         KMR_KVS *c0 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  3119         scan_table_files(c0, TAB_C, 0, 0, run);
  3122     KMR_KVS *co2 = JOIN2(C0, o1, KMR_KV_INTEGER,
  3123                          q13_join_c_o, 0, 
"c+o", pushoff);
  3125     KMR_KVS *co4 = JOIN1(co2, KMR_KV_OPAQUE,
  3126                          q13_join_co, 0, 
"c+o", 0);
  3128     KMR_KVS *co5 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
  3241 static struct PRODUCT q21_join_n_s = {
  3242     .inputs = {TAB_Q21_N1, TAB_S},
  3243     .output = TAB_Q21_NS,
  3246     .columns = {{0, SND}, {1, SND}},
  3252 static struct PRODUCT q21_join_l_ns = {
  3253     .inputs = {TAB_Q21_L1, TAB_Q21_NS},
  3254     .output = TAB_Q21_LNS,
  3257     .columns = {{0, FST}, {1, FST}, {1, SND}},
  3263 static struct PRODUCT q21_join_lns_o = {
  3264     .inputs = {TAB_Q21_LNS, TAB_Q21_O1},
  3265     .output = TAB_Q21_LNS,
  3268     .columns = {{0, FST}, {1, FST}, {2, FST}},
  3275 q21_select_n_by_name(
const struct kmr_kv_box kv0,
  3277                      void *p, 
const long i)
  3279     char kbuf[RECORD_SIZE];
  3280     struct kmr_ntuple *k = (
void *)kbuf;
  3281     char vbuf[RECORD_SIZE];
  3282     struct kmr_ntuple *v = (
void *)vbuf;
  3284     enum TABLE input = TAB_N;
  3285     enum TABLE output = TAB_Q21_N1;
  3287     struct RECORD *d = find_description(input);
  3289     KMR *mr = kvo->c.mr;
  3290     struct kmr_ntuple *u = (
void *)kv0.v.p;
  3292     assert(d->columns[1].field == F_TEXT);
  3294     char *s0 = 
"SAUDI ARABIA";
  3295     size_t len = strlen(s0);
  3296     if (((
size_t)n_name.len == len)
  3297         && (strncmp((
char *)n_name.p, s0, len) == 0)) {
  3304         add_record(kvo, k, v);
  3311                       KMR_KVS *kvo, 
void *p, 
const long i)
  3313     char kbuf[RECORD_SIZE];
  3314     struct kmr_ntuple *k = (
void *)kbuf;
  3315     char vbuf[RECORD_SIZE];
  3316     struct kmr_ntuple *v = (
void *)vbuf;
  3318     enum TABLE input = TAB_L;
  3319     enum TABLE output = TAB_Q21_L1;
  3320     struct RECORD *d = find_description(input);
  3322     KMR *mr = kvo->c.mr;
  3323     struct kmr_ntuple *u = (
void *)kv0.v.p;
  3325     time_t receiptdate = get_date_column(u, d, 
"l_receiptdate");
  3326     time_t commitdate = get_date_column(u, d, 
"l_commitdate");
  3327     if (receiptdate > commitdate) {
  3336         add_record(kvo, k, v);
  3344                       KMR_KVS *kvo, 
void *p, 
const long i)
  3346     char kbuf[RECORD_SIZE];
  3347     struct kmr_ntuple *k = (
void *)kbuf;
  3348     char vbuf[RECORD_SIZE];
  3349     struct kmr_ntuple *v = (
void *)vbuf;
  3351     enum TABLE input = TAB_L;
  3352     enum TABLE output = TAB_Q21_L3;
  3353     struct RECORD *d = find_description(input);
  3355     KMR *mr = kvo->c.mr;
  3356     struct kmr_ntuple *u = (
void *)kv0.v.p;
  3358     time_t receiptdate = get_date_column(u, d, 
"l_receiptdate");
  3359     time_t commitdate = get_date_column(u, d, 
"l_commitdate");
  3360     if (receiptdate > commitdate) {
  3369         add_record(kvo, k, v);
  3375 static struct SELECT q21_copy_l = {
  3377     .output = TAB_Q21_L3,
  3386                      KMR_KVS *kvo, 
void *p, 
const long i)
  3388     char vbuf[RECORD_SIZE];
  3389     struct kmr_ntuple *v = (
void *)vbuf;
  3391     enum TABLE input = TAB_O;
  3392     enum TABLE output = TAB_Q21_O1;
  3393     struct RECORD *d = find_description(input);
  3395     KMR *mr = kvo->c.mr;
  3396     struct kmr_ntuple *u = (
void *)kv0.v.p;
  3398     char *s0 = (
char *)status.p;
  3399     if (status.len == 1 && s0[0] == 
'F') {
  3406             .klen = orderkey.len,
  3418 q21_join_lnos_l2(
const struct kmr_kv_box kv[], 
const long n,
  3421     char kbuf[RECORD_SIZE];
  3422     struct kmr_ntuple *k = (
void *)kbuf;
  3426     int inputs[] = {TAB_Q21_LNS, TAB_Q21_L3};
  3428     struct RECORD *d0 = find_description((
enum TABLE)inputs[0]);
  3429     struct RECORD *d1 = find_description((
enum TABLE)inputs[1]);
  3431     KMR *mr = kvo->c.mr;
  3432     struct kmr_ntuple **vv[2];
  3436     for (
long i0 = 0; i0 < cnt[0]; i0++) {
  3437         struct kmr_ntuple *u0 = (
void *)vv[0][i0];
  3438         long suppkey0 = get_int_column_by_index(u0, d0->columns, 1);
  3440         for (
long i1 = 0; i1 < cnt[1]; i1++) {
  3441             struct kmr_ntuple *u1 = (
void *)vv[1][i1];
  3442             long suppkey1 = get_int_column_by_index(u1, d1->columns, 1);
  3443             if (suppkey0 != suppkey1) {
  3452             add_record(kvo, k, u0);
  3460 q21_join_lnos_l3(
const struct kmr_kv_box kv[], 
const long n,
  3463     char kbuf[RECORD_SIZE];
  3464     struct kmr_ntuple *k = (
void *)kbuf;
  3465     char vbuf[RECORD_SIZE];
  3466     struct kmr_ntuple *v = (
void *)vbuf;
  3468     int inputs[] = {TAB_Q21_LNS, TAB_Q21_L3};
  3471     struct RECORD *d0 = find_description((
enum TABLE)inputs[0]);
  3472     struct RECORD *d1 = find_description((
enum TABLE)inputs[1]);
  3474     KMR *mr = kvo->c.mr;
  3475     struct kmr_ntuple **vv[2];
  3479     for (
long i0 = 0; i0 < cnt[0]; i0++) {
  3480         struct kmr_ntuple *u0 = (
void *)vv[0][i0];
  3481         long suppkey0 = get_int_column_by_index(u0, d0->columns, 1);
  3482         _Bool nonexists = 1;
  3483         for (
long i1 = 0; i1 < cnt[1]; i1++) {
  3484             struct kmr_ntuple *u1 = (
void *)vv[1][i1];
  3485             long suppkey1 = get_int_column_by_index(u1, d1->columns, 1);
  3486             if (suppkey0 != suppkey1) {
  3497             add_record(kvo, k, v);
  3505 q21_join_numwait(
const struct kmr_kv_box kv[], 
const long n,
  3508     char kbuf[RECORD_SIZE];
  3509     struct kmr_ntuple *k = (
void *)kbuf;
  3510     char vbuf[RECORD_SIZE];
  3511     struct kmr_ntuple *v = (
void *)vbuf;
  3513     enum TABLE input = TAB_Q21_NAME;
  3514     enum TABLE output = TAB_Q21_NUMWAIT;
  3516     struct RECORD *d = find_description((
enum TABLE)input);
  3518     KMR *mr = kvo->c.mr;
  3520     struct kmr_ntuple *u = (
void *)kv[0].v.p;
  3523     long negnumwait = -n;
  3525     char nbuf[NAME_SIZE];
  3526     assert(s_name.len <= NAME_SIZE);
  3527     memset(nbuf, 0, 
sizeof(nbuf));
  3528     memcpy(nbuf, s_name.p, s_name.len);
  3531     kmr_put_ntuple(mr, k, (
int)
sizeof(kbuf), &negnumwait, 
sizeof(
long));
  3536     add_record(kvo, k, v);
  3542 q21(
KMR *mr, 
struct RUN *run)
  3544     _Bool pushoff = ((run->pushoff == 0) ? 0 : 1);
  3548         if (mr->rank == 0) {printf(
"q21 (with push-off)...\n"); fflush(0);}
  3550         if (mr->rank == 0) {printf(
"q21...\n"); fflush(0);}
  3555     KMR_KVS *n1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  3556     if (run->load_tables_in_advance) {
  3557         kmr_map(N0, n1, 0, kmr_noopt, q21_select_n_by_name);
  3559         scan_table_files(n1, TAB_N, q21_select_n_by_name, 0, run);
  3562     KMR_KVS *s1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  3564     if (run->load_tables_in_advance) {
  3565         kmr_map(S0, s1, &nth, kmr_noopt, key_by_nth);
  3567         scan_table_files(s1, TAB_S, key_by_nth, &nth, run);
  3572     KMR_KVS *ns0 = JOINP(n1, s1, KMR_KV_OPAQUE,
  3573                          &q21_join_n_s, 
"n+s", pushoff);
  3577     if (run->load_tables_in_advance) {
  3580         KMR_KVS *l0 = CREATE_KVS(mr, KMR_KV_OPAQUE, 0);
  3581         scan_table_files(l0, TAB_L, 0, 0, run);
  3584     KMR_KVS *l1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  3585     kmr_map(L0, l1, 0, inspect, q21_select_l1_by_date);
  3587     pcount(l1, 0, 
"l1", 1);
  3589     KMR_KVS *lns0 = JOINP(l1, ns0, KMR_KV_OPAQUE,
  3590                           &q21_join_l_ns, 
"l+ns", pushoff);
  3594     KMR_KVS *o1 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  3595     if (run->load_tables_in_advance) {
  3596         kmr_map(O0, o1, 0, kmr_noopt, q21_select_o_by_flag);
  3598         scan_table_files(o1, TAB_O, q21_select_o_by_flag, 0, run);
  3601     pcount(o1, 0, 
"o1", 1);
  3605     KMR_KVS *lnos0 = JOINP(lns0, o1, KMR_KV_OPAQUE,
  3606                            &q21_join_lns_o, 
"lns+o", pushoff);
  3612     KMR_KVS *l2 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  3613     kmr_map(L0, l2, &q21_copy_l, inspect, select_by_fields);
  3615     KMR_KVS *lnos2 = JOIN2(lnos0, l2, KMR_KV_OPAQUE,
  3616                            q21_join_lnos_l2, 0, 
"l2+lnos", pushoff);
  3620     KMR_KVS *l3 = CREATE_KVS(mr, KMR_KV_OPAQUE, pushoff);
  3621     kmr_map(L0, l3, 0, inspect, q21_select_l3_by_date);
  3624     KMR_KVS *name0 = JOIN2(lnos2, l3, KMR_KV_OPAQUE,
  3625                            q21_join_lnos_l3, 0, 
"l3+lnos", pushoff);
  3631     KMR_KVS *ex2 = JOIN1(name0, KMR_KV_OPAQUE,
  3632                          q21_join_numwait, 0, 
"numwait", 0);
  3639     pcount(ex3, 0, 
"sort", 1);
  3646     pcount(ex4, 0, 
"choose", 1);
  3654 atoi_safe(
char *s, 
int ok[], 
int n, 
char *m)
  3658     int cc = sscanf(s, 
"%d%c", &v, gomi);
  3660         fprintf(stderr, 
"%s.  Not integer (%s).\n", m, s);
  3663     for (
int i = 0; i < n; i++) {
  3668     fprintf(stderr, 
"%s.  Not acceptable (%d).\n", m, v);
  3672 struct RUN runs[50];
  3676 main(
int argc, 
char *argv[])
  3678     assert(
sizeof(
long) == 
sizeof(time_t));
  3682     setenv(
"TZ", 
"UTC", 1);
  3685     char *helpstring = (
"%s directory-of-tables [-C -F -P]"  3686                         " query [options] query [options]...\n"  3687                         "  query={7,9,10,13,21}\n"  3688                         "  options: [-p po -b sz -a -g -r -s]\n"  3689                         "  -p po: po={0,1,2}, push-off setting\n"  3690                         "  -b sz: block size for push-off (in KB)\n"  3691                         "  -a: scan tables in advance\n"  3692                         "  -g: stop push-off at the end\n"  3693                         "  -r: redistribute table entries\n"  3694                         "  -s: use small block size\n"  3695                         "  -C: report count in messages\n"  3696                         "  -F: report file read time\n"  3697                         "  -P: report push-off statistics\n");
  3699     int nprocs, rank, thlv;
  3701     MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thlv);
  3702     MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
  3703     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  3707             fprintf(stderr, helpstring, argv[0]);
  3716         struct rlimit core = {.rlim_cur = 0, .rlim_max = 0};
  3717         int cc = setrlimit(RLIMIT_CORE, &core);
  3718         if (cc != 0) {perror(
"setrlimit(core=0)");}
  3723     char *directory = argv[1];
  3727     while ((c = xxx_getopt(argc, argv, 
"hCFP")) != -1) {
  3732                 fprintf(stderr, helpstring, argv[0]);
  3740             report_count_in_messages = 1;
  3743             report_time_to_read = 1;
  3746             report_pushoff_statistics = 1;
  3751                 fprintf(stderr, 
"Unknown option (%c)\n", xxx_optopt);
  3752                 fprintf(stderr, helpstring, argv[0]);
  3760     if (xxx_optind >= argc) {
  3761         fprintf(stderr, helpstring, argv[0]);
  3767     while (xxx_optind < argc) {
  3768         int qset[] = {7, 9, 10, 13, 21};
  3769         int nq = (int)(
sizeof(qset) / 
sizeof(qset[0]));
  3770         int query = atoi_safe(argv[xxx_optind], qset, nq, 
"Bad query");
  3773                 fprintf(stderr, helpstring, argv[0]);
  3780         runs[nruns].query = query;
  3781         runs[nruns].pushoff = 0;
  3782         runs[nruns].load_tables_in_advance = 0;
  3783         runs[nruns].hang_out_communication = 0;
  3784         runs[nruns].redistribute_loaded_tables = 0;
  3785         runs[nruns].use_small_block_size = 0;
  3786         runs[nruns].pushoff_block_size_in_kilo = 64;
  3789         while ((c = xxx_getopt(argc, argv, 
"p:b:agrs")) != -1) {
  3793                 int okset[] = {0, 1, 2};
  3794                 int nokset = (int)(
sizeof(okset) / 
sizeof(okset[0]));
  3795                 int pushoffv = atoi_safe(xxx_optarg, okset, nokset,
  3797                 if (pushoffv == -1) {
  3799                         fprintf(stderr, helpstring, argv[0]);
  3805                 runs[nruns].pushoff = pushoffv;
  3811                 int bs = atoi(xxx_optarg);
  3812                 runs[nruns].pushoff_block_size_in_kilo = (size_t)bs;
  3817                 runs[nruns].load_tables_in_advance = 1;
  3820                 runs[nruns].hang_out_communication = 1;
  3823                 runs[nruns].redistribute_loaded_tables = 1;
  3826                 runs[nruns].use_small_block_size = 1;
  3831                     fprintf(stderr, 
"Unknown option (%c)\n", xxx_optopt);
  3832                     fprintf(stderr, helpstring, argv[0]);
  3841         if (nruns >= (
int)(
sizeof(runs)/
sizeof(runs[0]))) {
  3843                 fprintf(stderr, 
"run list too long\n");
  3844                 fprintf(stderr, helpstring, argv[0]);
  3855         sc += snprintf((ss + sc), (
sizeof(ss) - (
size_t)sc),
  3856                        "Running (%d runs) with:", nruns);
  3857         for (
int i = 0; i < nruns; i++) {
  3858             sc += snprintf((ss + sc), (
sizeof(ss) - (
size_t)sc),
  3859                            " %d", runs[i].query);
  3861         snprintf((ss + sc), (
sizeof(ss) - (
size_t)sc), 
"\n");
  3871         enum TABLE t[] = {TAB_N, TAB_R, TAB_P, TAB_S,
  3872                           TAB_H, TAB_C, TAB_O, TAB_L};
  3873         int nt = (int)(
sizeof(t) / 
sizeof(t[0]));
  3874         load_input_tables(nprocs, rank, directory, t, nt);
  3883         mr->pushoff_fast_notice = 1;
  3888     for (
int i = 0; i < nruns; i++) {
  3889         int query = runs[i].query;
  3890         _Bool pushoff = ((runs[i].pushoff == 0) ? 0 : 1);
  3895         mr->pushoff_hang_out = runs[i].hang_out_communication;
  3896         mr->pushoff_fast_notice = (runs[i].pushoff == 2);
  3897         mr->pushoff_stat = report_pushoff_statistics;
  3898         if (runs[i].use_small_block_size) {
  3899             mr->preset_block_size = (64 * 1024);
  3901             mr->preset_block_size = (64 * 1024 * 1024);
  3903         if (runs[i].pushoff_block_size_in_kilo != 0) {
  3904             mr->pushoff_block_size = (runs[i].pushoff_block_size_in_kilo
  3908         MPI_Barrier(MPI_COMM_WORLD);
  3911             fprintf(stdout, 
"Run Q%d (pushoff=%d fast_notice=%d)\n",
  3912                     query, pushoff, mr->pushoff_fast_notice);
  3913             fprintf(stderr, 
"Run Q%d (pushoff=%d fast_notice=%d)\n",
  3914                     query, pushoff, mr->pushoff_fast_notice);
  3920             printf(
"OPTION: (-p) pushoff=%d fast_notice=%d\n",
  3921                    pushoff, mr->pushoff_fast_notice);
  3922             printf(
"OPTION: (-a) load_tables_in_advance=%d\n",
  3923                    runs[i].load_tables_in_advance);
  3924             printf(
"OPTION: (-g) pushoff_hang_out=%d\n",
  3925                    mr->pushoff_hang_out);
  3926             printf(
"OPTION: (-r) redistribute_loaded_tables=%d\n",
  3927                    runs[i].redistribute_loaded_tables);
  3928             printf(
"OPTION: (-s) preset_block_size=%zd\n",
  3929                    mr->preset_block_size);
  3930             printf(
"OPTION: (-b) pushoff_block_size=%zd\n",
  3931                    mr->pushoff_block_size);
  3932             printf(
"OPTION: (-C) report_count_in_messages=%d\n",
  3933                    report_count_in_messages);
  3934             printf(
"OPTION: (-F) report_time_to_read=%d\n",
  3935                    report_time_to_read);
  3936             printf(
"OPTION: (-P) report_pushoff_statistics=%d\n",
  3937                    report_pushoff_statistics);
  3944         enum TABLE tbl = TAB_NIL;
  3947             if (runs[i].load_tables_in_advance) {
  3948                 enum TABLE t7[] = {TAB_N, TAB_S, TAB_C, TAB_O, TAB_L};
  3950                 int nt7 = (int)(
sizeof(t7) / 
sizeof(t7[0]));
  3951                 scan_table_files_in_advance(mr, t7, nt7, &runs[i]);
  3953             MPI_Barrier(MPI_COMM_WORLD);
  3955             r = q7(mr, &runs[i]);
  3956             tbl = TAB_Q7_REVENUE;
  3957             MPI_Barrier(MPI_COMM_WORLD);
  3962             if (runs[i].load_tables_in_advance) {
  3963                 enum TABLE t9[] = {TAB_N, TAB_P, TAB_S, TAB_H, TAB_O, TAB_L};
  3965                 int nt9 = (int)(
sizeof(t9) / 
sizeof(t9[0]));
  3966                 scan_table_files_in_advance(mr, t9, nt9, &runs[i]);
  3968             MPI_Barrier(MPI_COMM_WORLD);
  3970             r = q9(mr, &runs[i]);
  3971             tbl = TAB_Q9_AMOUNT;
  3972             MPI_Barrier(MPI_COMM_WORLD);
  3977             if (runs[i].load_tables_in_advance) {
  3978                 enum TABLE t10[] = {TAB_N, TAB_C, TAB_O, TAB_L};
  3980                 int nt10 = (int)(
sizeof(t10) / 
sizeof(t10[0]));
  3981                 scan_table_files_in_advance(mr, t10, nt10, &runs[i]);
  3983             MPI_Barrier(MPI_COMM_WORLD);
  3985             r = q10(mr, &runs[i]);
  3986             tbl = TAB_Q10_CLNO1;
  3987             MPI_Barrier(MPI_COMM_WORLD);
  3992             if (runs[i].load_tables_in_advance) {
  3993                 enum TABLE t13[] = {TAB_C, TAB_O, TAB_L};
  3995                 int nt13 = (int)(
sizeof(t13) / 
sizeof(t13[0]));
  3996                 scan_table_files_in_advance(mr, t13, nt13, &runs[i]);
  3998             MPI_Barrier(MPI_COMM_WORLD);
  4000             r = q13(mr, &runs[i]);
  4002             MPI_Barrier(MPI_COMM_WORLD);
  4007             if (runs[i].load_tables_in_advance) {
  4008                 enum TABLE t21[] = {TAB_N, TAB_S, TAB_O, TAB_L};
  4010                 int nt21 = (int)(
sizeof(t21) / 
sizeof(t21[0]));
  4011                 scan_table_files_in_advance(mr, t21, nt21, &runs[i]);
  4013             MPI_Barrier(MPI_COMM_WORLD);
  4015             r = q21(mr, &runs[i]);
  4016             tbl = TAB_Q21_NUMWAIT;
  4017             MPI_Barrier(MPI_COMM_WORLD);
  4022         if (rank == 0) {printf(
"Run Q%d in %f sec\n", query, (t1 - t0));}
  4027             if (mr->rank == 0) {
  4028                 printf(
"result count=%ld\n", rcnt);
  4036         if (pushoff && mr->pushoff_stat) {
  4037             char *s = 
"STATISTICS on push-off kvs:\n";
  4038             kmr_print_statistics_on_pushoff(mr, s);
  4044     kmr_fin_pushoff_fast_notice_();
 Key-Value Stream (abstract). 
 
#define kmr_reduce(KVI, KVO, ARG, OPT, R)
Reduces key-value pairs. 
 
Options to Mapping, Shuffling, and Reduction. 
 
int kmr_add_kv(KMR_KVS *kvs, const struct kmr_kv_box kv)
Adds a key-value pair. 
 
KMR_KVS * kmr_create_pushoff_kvs(KMR *mr, enum kmr_kv_field kf, enum kmr_kv_field vf, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types. 
 
void kmr_reset_ntuple(struct kmr_ntuple *u, int n, int marker)
Resets an n-tuple U with N entries and a MARKER. 
 
int kmr_put_ntuple(KMR *mr, struct kmr_ntuple *u, const int sz, const void *v, const int vlen)
Adds an entry V with LEN in an n-tuple U whose size is limited to SIZE. 
 
#define kmr_create_kvs(MR, KF, VF)
Makes a new key-value stream (of type KMR_KVS) with the specified field datatypes. 
 
struct kmr_ntuple_entry kmr_nth_ntuple(struct kmr_ntuple *u, int nth)
Returns an NTH entry of an n-tuple. 
 
int kmr_shuffle(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Shuffles key-value pairs to the appropriate destination ranks. 
 
int kmr_distribute(KMR_KVS *kvi, KMR_KVS *kvo, _Bool cyclic, struct kmr_option opt)
Distributes key-values so that each rank has approximately the same number of pairs. 
 
int kmr_add_kv_done(KMR_KVS *kvs)
Marks finished adding key-value pairs. 
 
int kmr_concatenate_kvs(KMR_KVS *kvs[], int nkvs, KMR_KVS *kvo, struct kmr_option opt)
Concatenates a number of KVSes to one. 
 
int kmr_free_kvs(KMR_KVS *kvs)
Releases a key-value stream (type KMR_KVS). 
 
void kmr_check_pushoff_fast_notice_(KMR *mr)
Check if fast-notice works. 
 
kmr_kv_field
Datatypes of Keys or Values. 
 
#define kmr_map(KVI, KVO, ARG, OPT, M)
Maps simply. 
 
Handy Copy of a Key-Value Field. 
 
void kmr_init_pushoff_fast_notice_(MPI_Comm, _Bool verbose)
Initializes RDMA for fast-notice. 
 
int kmr_fin(void)
Clears the environment. 
 
int kmr_get_element_count(KMR_KVS *kvs, long *v)
Gets the total number of key-value pairs. 
 
#define kmr_init()
Sets up the environment. 
 
int kmr_sort(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Sorts a key-value stream globally. 
 
int kmr_size_ntuple(struct kmr_ntuple *u)
Returns the storage size of an n-tuple. 
 
int kmr_separate_ntuples(KMR *mr, const struct kmr_kv_box kv[], const long n, struct kmr_ntuple **vv[2], long cnt[2], int markers[2], _Bool disallow_other_entries)
Separates the n-tuples stored in the value part of KV into the two sets by their marker values...
 
int kmr_put_ntuple_long(KMR *mr, struct kmr_ntuple *u, const int sz, long v)
Adds an integer value in an n-tuple U whose size is limited to SIZE. 
 
int kmr_product_ntuples(KMR_KVS *kvo, struct kmr_ntuple **vv[2], long cnt[2], int newmarker, int slots[][2], int nslots, int keys[][2], int nkeys)
Makes a direct product of the two sets of n-tuples VV[0] and VV[1] with their counts in CNT[0] and CN...
 
int kmr_histogram_count_by_ranks(KMR_KVS *kvs, long *frq, double *var, _Bool rankzeroonly)
Fills an integer array FRQ[i] with the count of the elements of each rank. 
 
int kmr_free_context(KMR *mr)
Releases a context created with kmr_create_context(). 
 
int kmr_map_rank_by_rank(KMR_KVS *kvi, KMR_KVS *kvo, void *arg, struct kmr_option opt, kmr_mapfn_t m)
Maps sequentially with rank by rank for debugging. 
 
int kmr_replicate(KMR_KVS *kvi, KMR_KVS *kvo, struct kmr_option opt)
Replicates key-value pairs to be visible on all ranks, that is, it has the effect of bcast or all-gat...
 
int kmr_put_ntuple_entry(KMR *mr, struct kmr_ntuple *u, const int sz, struct kmr_ntuple_entry e)
Adds an n-tuple entry E in an n-tuple U whose size is limited to SIZE. 
 
int(* kmr_redfn_t)(const struct kmr_kv_box kv[], const long n, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg)
Reduce-function Type. 
 
KMR_KVS * kmr_create_kvs7(KMR *mr, enum kmr_kv_field k, enum kmr_kv_field v, struct kmr_option opt, const char *, const int, const char *)
Makes a new key-value stream with the specified field data-types. 
 
int(* kmr_mapfn_t)(const struct kmr_kv_box kv, const KMR_KVS *kvi, KMR_KVS *kvo, void *arg, const long index)
Map-function Type. 
 
int kmr_choose_first_part(KMR_KVS *kvi, KMR_KVS *kvo, long n, struct kmr_option opt)
Chooses the first N entries from a key-value stream KVI. 
 
KMR * kmr_create_context(const MPI_Comm comm, const MPI_Info conf, const char *name)
Makes a new KMR context (a context has type KMR).