1
2
3
4 """Python Binding for KMR Map-Reduce Library. This provides
5 straightforward wrappers to the C routines. See more abort KMR at
6 "http://mt.aics.riken.jp/kmr". All key-value data is stored in C
7 structures after encoding/decoding Python objects to byte arrays in C.
8 Documentation in Python is minimum, so please refer to the
9 documentation in C."""
10
11
12
13
14 import warnings
15 import ctypes
16 import cPickle
17 import inspect
18 import traceback
19 import sys
20
21 __version__ = "20160425"
22
23 kmrso = ctypes.CDLL("libkmr.so")
24
25 """kmrso holds a libkmr.so library object."""
26
27 _kmrso_version = ctypes.c_int.in_dll(kmrso, "kmr_version").value
28 if (__version__ != str(_kmrso_version)):
29 warnings.warn(("Version unmatch with libkmr.so;"
30 + " found=" + str(_kmrso_version)
31 + " required=" + __version__),
32 RuntimeWarning)
33
34
35 cpickle_protocol = 0
36
37 """cpickle_protocol specifies a protocol used in encoding/decoding
38 key-value fields. NOTE: The highest protocol of cpickle (value 2) is
39 avoided because it fails to encode/decode integer zero in
40 python-2.7.10, gcc-4.8.2, x86-64."""
41
42 warning_function = warnings.warn
43
44 """warning_function specifies the function used to issue warnings."""
45
46 ignore_exceptions_in_map_fn = True
47
48 """ignore_exceptions_in_map_fn=True makes exceptions ignored."""
49
50 print_backtrace_in_map_fn = True
51
52 """print_backtrace_in_map_fn=True makes backtraces are printed at
53 exceptions in mapper/reducer functions."""
54
55 force_null_terminate_in_cstring = True
56
57 """force_null_terminate_in_cstring specifies to add a null-terminator
58 in C strings. Do not change it while some KVS'es are live."""
59
60 kmrso.kmr_init_2.argtypes = [ctypes.c_int]
61 kmrso.kmr_init_2.restype = ctypes.c_int
62
63
64
65 kmrso.kmr_init_2(0)
66
67 _c_pointer = ctypes.POINTER(ctypes.c_char)
68 _c_kmr = _c_pointer
69 _c_kvs = _c_pointer
70 _c_kvsvec = ctypes.c_void_p
71 _c_boxvec = ctypes.c_void_p
72 _c_fnp = ctypes.c_void_p
73 _c_void_p = ctypes.c_void_p
74 _c_ubyte = ctypes.c_ubyte
75 _c_bool = ctypes.c_bool
76 _c_int = ctypes.c_int
77 _c_uint = ctypes.c_uint
78 _c_long = ctypes.c_long
79 _c_uint8 = ctypes.c_uint8
80 _c_uint32 = ctypes.c_uint32
81 _c_uint64 = ctypes.c_uint64
82 _c_double = ctypes.c_double
83 _c_size_t = ctypes.c_size_t
84 _c_string = ctypes.c_char_p
85
86
87
88
89 _c_funcptr = type(kmrso.kmr_init_2)
90
91
92
93 _c_null_pointer_value = _c_pointer()
94
96 """Returns true if ctypes pointer is null."""
97
98 return (not bool(p))
99
101 """Returns a print string of options for _c_option,
102 _c_file_option, and _c_spawn_option."""
103
104 prefix = o.__class__.__name__
105 attrs = o.__class__._fields_
106 ss = []
107 for (f, _, _) in attrs:
108 if ((f not in ["gap16", "gap32"]) and getattr(o, f) == 1):
109 ss.append(f + "=1")
110 return (prefix + "(" + (",".join(ss)) + ")")
111
113 """kmr_option."""
114
115 _fields_ = [
116 ("nothreading", _c_uint8, 1),
117 ("inspect", _c_uint8, 1),
118 ("keep_open", _c_uint8, 1),
119 ("key_as_rank", _c_uint8, 1),
120 ("rank_zero", _c_uint8, 1),
121 ("collapse", _c_uint8, 1),
122 ("take_ckpt", _c_uint8, 1),
123 ("gap16", _c_uint, 16),
124 ("gap32", _c_uint, 32)]
125
126 - def __init__(self, opts=None, enabledlist=None):
127 super(_c_option, self).__init__()
128 if opts is None: opts = {}
129 if enabledlist is None: enabledlist = []
130
131 for o, v in opts.iteritems():
132 if (o in ["key", "value", "output"]):
133
134 pass
135 elif (enabledlist != [] and (o not in enabledlist)):
136 raise Exception("Bad option: %s" % o)
137 elif (o == "nothreading"):
138 self.nothreading = v
139 elif (o == "inspect"):
140 self.inspect = v
141 elif (o == "keep_open"):
142 self.keep_open = v
143 elif (o == "key_as_rank"):
144 self.key_as_rank = v
145 elif (o == "rank_zero"):
146 self.rank_zero = v
147 elif (o == "collapse"):
148 self.collapse = v
149 elif (o == "take_ckpt"):
150 self.take_ckpt = v
151 else:
152 raise Exception("Bad option: %s" % o)
153 return
154
156 return _string_of_options(self)
157
159 """kmr_file_option."""
160
161 _fields_ = [
162 ("each_rank", _c_uint8, 1),
163 ("subdirectories", _c_uint8, 1),
164 ("list_file", _c_uint8, 1),
165 ("shuffle_names", _c_uint8, 1),
166 ("gap16", _c_uint, 16),
167 ("gap32", _c_uint, 32)]
168
169 - def __init__(self, opts=None, enabledlist=None):
170 super(_c_file_option, self).__init__()
171 if opts is None: opts = {}
172 if enabledlist is None: enabledlist = []
173
174 for o, v in opts.iteritems():
175 if (o == "key" or o == "output"):
176
177 pass
178 elif (enabledlist != [] and (o not in enabledlist)):
179 raise Exception("Bad option: %s" % o)
180 elif (o == "each_rank"):
181 self.each_rank = v
182 elif (o == "subdirectories"):
183 self.subdirectories = v
184 elif (o == "list_file"):
185 self.list_file = v
186 elif (o == "shuffle_names"):
187 self.shuffle_names = v
188 else:
189 raise Exception("Bad option: %s" % o)
190 return
191
193 return _string_of_options(self)
194
196 """kmr_spawn_option."""
197
198 _fields_ = [
199 ("separator_space", _c_uint8, 1),
200 ("reply_each", _c_uint8, 1),
201 ("reply_root", _c_uint8, 1),
202 ("one_by_one", _c_uint8, 1),
203 ("take_ckpt", _c_uint8, 1),
204 ("gap16", _c_uint, 16),
205 ("gap32", _c_uint, 32)]
206
207 - def __init__(self, opts=None, enabledlist=None):
208 super(_c_spawn_option, self).__init__()
209 if opts is None: opts = {}
210 if enabledlist is None: enabledlist = []
211
212 for o, v in opts.iteritems():
213 if (o == "key" or o == "output"):
214
215 pass
216 elif (enabledlist != [] and (o not in enabledlist)):
217 raise Exception("Bad option: %s" % o)
218 elif (o == "separator_space"):
219 self.separator_space = v
220 elif (o == "reply_each"):
221 self.reply_each = v
222 elif (o == "reply_root"):
223 self.reply_root = v
224 elif (o == "one_by_one"):
225 self.one_by_one = v
226 elif (o == "take_ckpt"):
227 self.take_ckpt = v
228 else:
229 raise Exception("Bad option: %s" % o)
230 return
231
233 return _string_of_options(self)
234
236 """kmr_unit_sized {const char *p; long i; double d;}."""
237
238 _fields_ = [
239 ("p", _c_string),
240 ("i", _c_long),
241 ("d", _c_double)]
242
244 """kmr_kv_box {int klen, vlen; kmr_unit_sized k, v;}."""
245
246 _fields_ = [
247 ("klen", _c_int),
248 ("vlen", _c_int),
249 ("k", _c_unitsized),
250 ("v", _c_unitsized)]
251
252
253
254
257
258 - def set(self, klen, key, vlen, val):
259 self.klen = klen
260 self.vlen = vlen
261 self.k = key
262 self.v = val
263 return self
264
265 kmrso.kmr_mpi_type_size.argtypes = [_c_string]
266 kmrso.kmr_mpi_type_size.restype = _c_size_t
267
268 kmrso.kmr_mpi_constant_value.argtypes = [_c_string]
269 kmrso.kmr_mpi_constant_value.restype = _c_uint64
270
271
272
273
275 def c_type_by_size(siz):
276 if (siz == ctypes.sizeof(_c_uint64)):
277 return _c_uint64
278 elif (siz == ctypes.sizeof(_c_uint32)):
279 return _c_uint32
280 else:
281 raise Exception("Bad type size unknown: %d" % siz)
282 return None
283 global _c_mpi_comm, _c_mpi_info
284 global _mpi_comm_world, _mpi_comm_self, _mpi_info_null
285 siz = kmrso.kmr_mpi_type_size("MPI_Comm")
286 _c_mpi_comm = c_type_by_size(siz)
287 siz = kmrso.kmr_mpi_type_size("MPI_Info")
288 _c_mpi_info = c_type_by_size(siz)
289 _mpi_comm_world = kmrso.kmr_mpi_constant_value("MPI_COMM_WORLD")
290 _mpi_comm_self = kmrso.kmr_mpi_constant_value("MPI_COMM_SELF")
291 _mpi_info_null = kmrso.kmr_mpi_constant_value("MPI_INFO_NULL")
292 return
293
294 _setup_mpi_constants()
295
296 kmrso.kmr_fin.argtypes = []
297 kmrso.kmr_fin.restype = _c_int
298
299 kmrso.kmr_create_context.argtypes = [_c_mpi_comm, _c_mpi_info, _c_string]
300 kmrso.kmr_create_context.restype = _c_pointer
301
302 kmrso.kmr_create_dummy_context.argtypes = []
303 kmrso.kmr_create_dummy_context.restype = _c_pointer
304
305 kmrso.kmr_free_context.argtypes = [_c_kmr]
306 kmrso.kmr_free_context.restype = None
307
308 kmrso.kmr_set_option_by_strings.argtypes = [_c_kmr, _c_string, _c_string]
309 kmrso.kmr_set_option_by_strings.restype = None
310
311 kmrso.kmr_create_kvs7.argtypes = [
312 _c_kmr, _c_int, _c_int, _c_option, _c_string, _c_int, _c_string]
313 kmrso.kmr_create_kvs7.restype = _c_kvs
314
315 kmrso.kmr_add_kv.argtypes = [_c_kvs, _c_kvbox]
316 kmrso.kmr_add_kv.restype = None
317
318 kmrso.kmr_add_kv_done.argtypes = [_c_kvs]
319 kmrso.kmr_add_kv_done.restype = None
320
321 kmrso.kmr_get_element_count.argtypes = [_c_kvs]
322 kmrso.kmr_get_element_count.restype = _c_long
323
324 kmrso.kmr_local_element_count.argtypes = [_c_kvs]
325 kmrso.kmr_local_element_count.restype = _c_long
326
327 kmrso.kmr_map9.argtypes = [
328 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp,
329 _c_string, _c_int, _c_string]
330 kmrso.kmr_map9.restype = None
331
332 kmrso.kmr_map_once.argtypes = [_c_kvs, _c_void_p, _c_option, _c_bool, _c_fnp]
333 kmrso.kmr_map_once.restype = None
334
335 kmrso.kmr_map_rank_by_rank.argtypes = [
336 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
337 kmrso.kmr_map_rank_by_rank.restype = None
338
339 kmrso.kmr_map_for_some.argtypes = [
340 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
341 kmrso.kmr_map_for_some.restype = None
342
343 kmrso.kmr_map_ms.argtypes = [_c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
344 kmrso.kmr_map_ms.restype = _c_int
345
346 kmrso.kmr_map_ms_commands.argtypes = [
347 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_spawn_option, _c_fnp]
348 kmrso.kmr_map_ms_commands.restype = _c_int
349
350 kmrso.kmr_map_via_spawn.argtypes = [
351 _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, _c_spawn_option, _c_fnp]
352 kmrso.kmr_map_via_spawn.restype = None
353
354 kmrso.kmr_map_processes.argtypes = [
355 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, _c_spawn_option, _c_fnp]
356 kmrso.kmr_map_processes.restype = None
357
358 kmrso.kmr_reduce9.argtypes = [
359 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp,
360 _c_string, _c_int, _c_string]
361 kmrso.kmr_reduce9.restype = None
362
363 kmrso.kmr_reduce_as_one.argtypes = [
364 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp]
365 kmrso.kmr_reduce_as_one.type = None
366
367 kmrso.kmr_shuffle.argtypes = [_c_kvs, _c_kvs, _c_option]
368 kmrso.kmr_shuffle.restype = None
369
370 kmrso.kmr_replicate.argtypes = [_c_kvs, _c_kvs, _c_option]
371 kmrso.kmr_replicate.restype = None
372
373 kmrso.kmr_distribute.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option]
374 kmrso.kmr_distribute.restype = None
375
376 kmrso.kmr_concatenate_kvs.argtypes = [_c_kvsvec, _c_int, _c_kvs, _c_option]
377 kmrso.kmr_concatenate_kvs.restype = None
378
379 kmrso.kmr_reverse.argtypes = [_c_kvs, _c_kvs, _c_option]
380 kmrso.kmr_reverse.restype = None
381
382 kmrso.kmr_sort.argtypes = [_c_kvs, _c_kvs, _c_option]
383 kmrso.kmr_sort.restype = None
384
385 kmrso.kmr_sort_locally.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option]
386 kmrso.kmr_sort_locally.restype = None
387
388 kmrso.kmr_reply_to_spawner.argtypes = [_c_kmr]
389 kmrso.kmr_reply_to_spawner.restype = None
390
391 kmrso.kmr_send_kvs_to_spawner.argtypes = [_c_kmr, _c_kvs]
392 kmrso.kmr_send_kvs_to_spawner.restype = None
393
394 kmrso.kmr_get_spawner_communicator.argtypes = [_c_void_p, _c_long]
395 kmrso.kmr_get_spawner_communicator.restype = _c_mpi_comm
396
397 kmrso.kmr_read_files_reassemble.argtypes = [
398 _c_kmr, _c_string, _c_int, _c_uint64, _c_uint64,
399 ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)]
400 kmrso.kmr_read_files_reassemble.restype = None
401
402 kmrso.kmr_read_file_by_segments.argtypes = [
403 _c_kmr, _c_string, _c_int,
404 ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)]
405 kmrso.kmr_read_file_by_segments.restype = None
406
407 kmrso.kmr_save_kvs.argtypes = [
408 _c_kvs, ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_size_t), _c_option]
409 kmrso.kmr_save_kvs.restype = None
410
411 kmrso.kmr_restore_kvs.argtypes = [
412 _c_kvs, _c_void_p, _c_size_t, _c_option]
413 kmrso.kmr_restore_kvs.restype = None
414
415 kmrso.kmr_dump_kvs.argtypes = [_c_kvs, _c_int]
416 kmrso.kmr_dump_kvs.restype = None
417
418 kmrso.kmr_get_key_type_ff.argtypes = [_c_kvs]
419 kmrso.kmr_get_key_type_ff.restype = _c_int
420
421 kmrso.kmr_get_value_type_ff.argtypes = [_c_kvs]
422 kmrso.kmr_get_value_type_ff.restype = _c_int
423
424 kmrso.kmr_get_nprocs.argtypes = [_c_kmr]
425 kmrso.kmr_get_nprocs.restype = _c_int
426
427 kmrso.kmr_get_rank.argtypes = [_c_kmr]
428 kmrso.kmr_get_rank.restype = _c_int
429
430 kmrso.kmr_mfree.argtypes = [_c_void_p, _c_size_t]
431 kmrso.kmr_mfree.restype = None
432
433 kmrso.kmr_stringify_options.argtypes = [_c_option]
434 kmrso.kmr_stringify_options.restype = _c_string
435
436 kmrso.kmr_stringify_file_options.argtypes = [_c_file_option]
437 kmrso.kmr_stringify_file_options.restype = _c_string
438
439 kmrso.kmr_stringify_spawn_options.argtypes = [_c_spawn_option]
440 kmrso.kmr_stringify_spawn_options.restype = _c_string
441
442
443
444 _kv_bad = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_bad").value
445 _kv_opaque = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_opaque").value
446 _kv_cstring = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_cstring").value
447 _kv_integer = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_integer").value
448 _kv_float8 = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_float8").value
449
450 _spawn_option_list = [_k for (_k, _, _) in _c_spawn_option._fields_]
451 _file_option_list = [_k for (_k, _, _) in _c_file_option._fields_]
452
453 _field_name_type_map = {
454 "opaque" : _kv_opaque, "cstring" : _kv_cstring,
455 "integer" : _kv_integer, "float8" : _kv_float8}
456
457 _field_type_name_map = dict(
458 map(tuple, map(reversed, _field_name_type_map.items())))
459
460
461
462 _MKMAPFN = ctypes.CFUNCTYPE(_c_int, _c_kvbox, _c_kvs, _c_kvs,
463 _c_void_p, _c_long)
464 _MKREDFN = ctypes.CFUNCTYPE(_c_int, _c_boxvec, _c_long,
465 _c_kvs, _c_kvs, _c_void_p)
466
467
468
469 _c_mapfn_argtypes = [_c_kvbox, _c_kvs, _c_kvs, _c_void_p, _c_long]
470 _c_mapfn_restype = _c_int
471
472 _c_redfn_argtypes = [_c_boxvec, _c_long, _c_kvs, _c_kvs, _c_void_p]
473 _c_redfn_restype = _c_int
474
476 """Returns a closure which calls a given Python map-function on
477 the unmarshalled contents in KVS."""
478
479 if (pyfn is None):
480 return 0
481 elif (isinstance(pyfn, _c_funcptr)):
482 return pyfn
483 else:
484 def applyfn(cbox, ckvi, ckvo, carg, cindex):
485 kvi = KVS(ckvi)
486 kvo = KVS(ckvo)
487 key = kvi._decode_content(cbox.klen, cbox.k, "key")
488 val = kvi._decode_content(cbox.vlen, cbox.v, "value")
489 try:
490 pyfn((key, val), kvi, kvo, cindex)
491 except:
492 warning_function(("Exception in Python callbacks: %s"
493 % str(sys.exc_info()[1])),
494 RuntimeWarning)
495 if (print_backtrace_in_map_fn): traceback.print_exc()
496 return (0 if ignore_exceptions_in_map_fn else -1)
497 return _MKMAPFN(applyfn)
498
500 """Returns a closure which calls a given Python reduce-function on
501 the unmarshalled contents in KVS."""
502
503 if (pyfn is None):
504 return 0
505 elif (isinstance(pyfn, _c_funcptr)):
506 return pyfn
507 else:
508 def applyfn(cboxvec, n, ckvi, ckvo, carg):
509 kvi = KVS(ckvi)
510 kvo = KVS(ckvo)
511 kvvec = []
512 for i in range(0, n):
513 pos = (cboxvec + ctypes.sizeof(_c_kvbox) * i)
514 cbox = _c_kvbox.from_address(pos)
515 key = kvi._decode_content(cbox.klen, cbox.k, "key")
516 val = kvi._decode_content(cbox.vlen, cbox.v, "value")
517 kvvec.append((key, val))
518 try:
519 pyfn(kvvec, kvi, kvo)
520 except:
521 warning_function(("Exception in Python callbacks: %s"
522 % str(sys.exc_info()[1])),
523 RuntimeWarning)
524 if (print_backtrace_in_map_fn): traceback.print_exc()
525 return (0 if ignore_exceptions_in_map_fn else -1)
526 return _MKREDFN(applyfn)
527
529 """Returns a triple of the options: a key field type, a value
530 field type, and a flag of needs of output generation."""
531
532 if ((not with_keyty_valty) and (("key" in opts) or ("value" in opts))):
533 raise Exception("Bad option: key= or value= not allowed")
534 keyty = opts.get("key", "opaque")
535 valty = opts.get("value", "opaque")
536 mkkvo = opts.get("output", True)
537 return (keyty, valty, mkkvo)
538
540 sp = frame
541 co = sp.f_code
542 return (co.co_filename, sp.f_lineno, co.co_name)
543
545 """Returns a pair of dictionaries, the 1st holds options to spawn,
546 and the 2nd holds the other options."""
547
548 sopts = dict()
549 mopts = dict()
550 for o, v in opts.iteritems():
551 if (o in _spawn_option_list):
552 sopts[o] = v
553 else:
554 mopts[o] = v
555 return (sopts, mopts)
556
558 """KMR context."""
559
560
561
562
564 """Makes a KMR context with a given MPI communicator (comm),
565 which is used in succeeding operations. Info specifies its
566 options by MPI_Info. Arguments of comm/info are passed as a
567 long integer (assuming either an integer (int) or a pointer in
568 C). It also accepts a string "dummy" or "world" as a comm
569 argument."""
570
571 if (isinstance(info, (int, long))):
572 warninfo = False
573 cinfo = info
574 else:
575 warninfo = (info != None)
576 cinfo = _mpi_info_null
577 if (isinstance(comm, (int, long))):
578 warncomm = False
579 ccomm = comm
580 elif (comm == "dummy"):
581 warncomm = False
582 ccomm = _mpi_comm_self
583 elif (comm == "world"):
584 warncomm = False
585 ccomm = _mpi_comm_world
586 else:
587 warncomm = True
588 ccomm = _mpi_comm_world
589
590 self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, "")
591
592 """self._ckmr holds the C part of a KMR context."""
593
594 if (_c_null_pointer(self._ckmr)):
595 raise Exception("kmr_create_context: failed")
596
597 self._dismissed = False
598
599 """self._dismissed=True disables freeing KVS'es (by memory
600 management) which remain unconsumed after dismissing a KMR
601 context. It is because freeing them causes referencing
602 dangling pointers in C."""
603
604 self.emptykvs = KVS(self).free()
605
606 """self.emptykvs holds an empty KVS needed by map_once,
607 map_on_rank_zero, read_files_reassemble, and
608 read_file_by_segments."""
609
610 self.nprocs = kmrso.kmr_get_nprocs(self._ckmr)
611
612 """self.nprocs holds an nprocs of MPI."""
613
614 self.rank = kmrso.kmr_get_rank(self._ckmr)
615
616 """self.rank holds a rank of MPI."""
617
618 if (warncomm and (self.rank == 0)):
619 warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning)
620 if (warninfo and (self.rank == 0)):
621 warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning)
622 return
623
627
629 """Dismisses KMR (an alias of dismiss())."""
630
631 self.dismiss()
632
634 """Dismisses KMR."""
635
636 if (not _c_null_pointer(self._ckmr)):
637 kmrso.kmr_free_context(self._ckmr)
638 self._ckmr = _c_null_pointer_value
639 self._dismissed = True
640 self.emptykvs = None
641 self.nprocs = -1
642 self.rank = -1
643 return
644
646 """Makes a new KVS (an alias of make_kvs())."""
647
648 self.make_kvs(**opts)
649
651 """Makes a new KVS."""
652
653 (keyty, valty, _) = _get_options(opts, True)
654 return KVS(self, keyty, valty)
655
657 """Sends a reply message from a spawned process."""
658
659 kmrso.kmr_reply_to_spawner(self._ckmr)
660 return
661
663 """Obtains a parent communicator of a spawned process."""
664
665 return kmrso.kmr_get_spawner_communicator(self._ckmr, index)
666
668 """Sends the KVS from a spawned process to the spawner."""
669
670 return kmrso.kmr_send_kvs_to_spawner(self._ckmr, kvs._ckvs)
671
673 """Sets KMR option, taking both arguments by strings."""
674
675 kmrso.kmr_set_option_by_strings(self._ckmr, k, v)
676 return
677
678 _enabled_options_of_map = [
679 "nothreading", "inspect", "keep_open", "take_ckpt"]
680
681 _enabled_options_of_map_once = [
682 "nothreading", "keep_open", "take_ckpt"]
683
684 _enabled_options_of_map_ms = [
685 "nothreading", "keep_open"]
686
687 _enabled_options_of_reduce = [
688 "nothreading", "inspect", "take_ckpt"]
689
690 _enabled_options_of_reduce_as_one = [
691 "inspect", "take_ckpt"]
692
693 _enabled_options_of_shuffle = [
694 "inspect", "rank_zero", "take_ckpt"]
695
696 _enabled_options_of_distribute = [
697 "nothreading", "inspect", "keep_open"]
698
699 _enabled_options_of_sort_locally = [
700 "nothreading", "inspect", "key_as_rank"]
701
702 _enabled_options_of_sort = [
703 "inspect"]
704
706 """KVS. Note that there are dummy KVS'es which are temporarily
707 created to hold the C structure of the KVS passed to
708 mapper/reducer functions. A dummy KVS has None in its "mr"
709 attribute."""
710
711
712
713 - def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"):
714 """Makes a KVS for a given KMR. A KVS is created by
715 specifying the datatypes stored in the key and the value,
716 using the keywords "key=" and "value=". The datatype name is
717 a string, one of "opaque", "cstring", "integer", and "float8".
718 Thus, most mappers and reducers (precisely, the ones which
719 accepts a function argument) take keyword arguments with the
720 defaults key="opaque" and value="opaque". The datatypes
721 affects the sorting order. Do not call constructors directly,
722 but via KMR.make_kvs()."""
723
724 self.mr = None
725
726 """mr attribute holds a KMR context object. Note that mr is
727 not accessible from mapping/reducing functions."""
728
729 if isinstance(kmr_or_ckvs, KMR):
730 kf = _field_name_type_map[keyty]
731 vf = _field_name_type_map[valty]
732 top = inspect.currentframe().f_back
733 self.mr = kmr_or_ckvs
734 (f, l, n) = _make_frame_info(top)
735 self._ckvs = kmrso.kmr_create_kvs7(
736 self.mr._ckmr, kf, vf, _c_option(), f, l, n)
737 elif isinstance(kmr_or_ckvs, _c_pointer):
738
739 self.mr = None
740 self._ckvs = kmr_or_ckvs
741 else:
742 raise Exception("Bad call to kvs constructor")
743
745 if ((not self._is_dummy()) and (not _c_null_pointer(self._ckvs))):
746 self.free()
747 return
748
750 """Finishes the C part of a KVS."""
751
752 if (self._is_dummy()):
753 raise Exception("Bad call to free_kvs on dummy KVS")
754 elif (_c_null_pointer(self._ckvs)):
755 raise Exception("Bad call to free_kvs on freed KVS")
756 elif ((not self.mr is None) and self.mr._dismissed):
757
758 pass
759 else:
760 kmrso.kmr_free_kvs(self._ckvs)
761 self._ckvs = _c_null_pointer_value
762 return self
763
765 return (self.mr is None)
766
771
772 - def _encode_content(self, o, key_or_value):
773 """Marshalls an object with regard to the field type. It
774 retuns a 3-tuple, with length, value-union, and the 3nd to
775 keep a reference to a buffer."""
776
777 kvty = self.get_field_type(key_or_value)
778 u = _c_unitsized()
779 if (kvty == "opaque"):
780 data = cPickle.dumps(o, cpickle_protocol)
781 u.p = data
782 return (len(data), u, data)
783 elif (kvty == "cstring"):
784 if (not isinstance(o, str)):
785 raise Exception("Not 8-bit string for cstring: %s" % o)
786
787 data = ((o + "\0") if force_null_terminate_in_cstring else o)
788 u.p = data
789 return (len(data), u, data)
790 elif (kvty == "integer"):
791 u.i = o
792 return (ctypes.sizeof(_c_long), u, None)
793 elif (kvty == "float8"):
794 u.d = o
795 return (ctypes.sizeof(_c_double), u, None)
796 else:
797 raise Exception("Bad field type: %s" % kvty)
798
799 - def _decode_content(self, siz, u, key_or_value):
800 """Unmarshalls an object with regard to the field type. It
801 returns integer 0 when the length is 0 (it is for a dummy
802 key-value used in kmr_map_once() etc)."""
803
804 if (siz == 0):
805 return 0
806 else:
807 kvty = self.get_field_type(key_or_value)
808 if (kvty == "opaque"):
809 s = ctypes.string_at(u.p, siz)
810 o = cPickle.loads(s)
811 return o
812 elif (kvty == "cstring"):
813
814 siz1 = ((siz - 1) if force_null_terminate_in_cstring else siz)
815 s = ctypes.string_at(u.p, siz1)
816 return s
817 elif (kvty == "integer"):
818 return u.i
819 elif (kvty == "float8"):
820 return u.d
821 else:
822 raise Exception("Bad field type: %s" % kvty)
823
825 """Get a field type of a KVS."""
826
827 if (_c_null_pointer(self._ckvs)):
828 raise Exception("Bad KVS (null C-object)")
829 if (key_or_value == "key"):
830 kvty = kmrso.kmr_get_key_type_ff(self._ckvs)
831 elif (key_or_value == "value"):
832 kvty = kmrso.kmr_get_value_type_ff(self._ckvs)
833 else:
834 raise Exception("Bad field %s" % key_or_value)
835 if (kvty == _kv_bad):
836 raise Exception("Bad field type value %d in KVS" % kvty)
837 else:
838 return _field_type_name_map[kvty]
839
840 - def add(self, key, val):
841 """Adds a key-value pair."""
842
843 self.add_kv(key, val)
844 return
845
847 """Adds a key-value pair."""
848
849
850
851 (klen, k, ks) = self._encode_content(key, "key")
852 (vlen, v, vs) = self._encode_content(val, "value")
853 cbox = _c_kvbox().set(klen, k, vlen, v)
854 kmrso.kmr_add_kv(self._ckvs, cbox)
855 return
856
858 """Finishes adding key-value pairs."""
859
860 kmrso.kmr_add_kv_done(self._ckvs)
861 return
862
864 """Gets the total number of key-value pairs."""
865
866 c = _c_long(0)
867 kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c))
868 return c.value
869
871 """Gets the number of key-value pairs locally."""
872
873 c = _c_long(0)
874 kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c))
875 return c.value
876
877 - def map(self, fn, **mopts):
878 """Maps simply."""
879
880 (keyty, valty, mkkvo) = _get_options(mopts, True)
881 cmopts = _c_option(mopts, _enabled_options_of_map)
882 cfn = _wrap_mapfn(fn)
883 ckvi = self._ckvs
884 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
885 ckvo = (kvo._ckvs if (kvo is not None) else None)
886 (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
887 kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
888 if (cmopts.inspect == 0): self._consume()
889 return kvo
890
891 - def map_once(self, rank_zero_only, fn, **mopts):
892 """Maps once with a dummy key-value pair."""
893
894
895 (keyty, valty, mkkvo) = _get_options(mopts, True)
896 cmopts = _c_option(mopts, _enabled_options_of_map_once)
897 cfn = _wrap_mapfn(fn)
898 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
899 ckvo = (kvo._ckvs if (kvo is not None) else None)
900 kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn)
901 return kvo
902
904 """Maps on rank0 only."""
905
906
907 return self.map_once(True, fn, *mopts)
908
910 """Maps sequentially with rank by rank for debugging."""
911
912 (keyty, valty, mkkvo) = _get_options(mopts, True)
913 cmopts = _c_option(mopts, _enabled_options_of_map)
914 cfn = _wrap_mapfn(fn)
915 ckvi = self._ckvs
916 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
917 ckvo = (kvo._ckvs if (kvo is not None) else None)
918 kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn)
919 if (cmopts.inspect == 0): self._consume()
920 return kvo
921
923 """Maps until some key-value are added."""
924
925 (keyty, valty, mkkvo) = _get_options(mopts, True)
926 cmopts = _c_option(mopts, _enabled_options_of_map)
927 ckvi = self._ckvs
928 cfn = _wrap_mapfn(fn)
929 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
930 ckvo = (kvo._ckvs if (kvo is not None) else None)
931 kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn)
932 if (cmopts.inspect == 0): self._consume()
933 return kvo
934
935 - def map_ms(self, fn, **mopts):
936 """Maps in master-slave mode."""
937
938
939 (keyty, valty, mkkvo) = _get_options(mopts, True)
940 cmopts = _c_option(mopts, _enabled_options_of_map_ms)
941 cfn = _wrap_mapfn(fn)
942 ckvi = self._ckvs
943 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
944 ckvo = (kvo._ckvs if (kvo is not None) else None)
945 rr = 1
946 while (rr != 0):
947 rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn)
948 self._consume()
949 return kvo
950
952 """Maps in master-slave mode, and runs serial commands."""
953
954 (sopts, mopts) = _filter_spawn_options(xopts)
955 (keyty, valty, mkkvo) = _get_options(mopts, True)
956 cmopts = _c_option(mopts, _enabled_options_of_map_ms)
957 csopts = _c_spawn_option(sopts)
958 cfn = _wrap_mapfn(fn)
959 ckvi = self._ckvs
960 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
961 ckvo = (kvo._ckvs if (kvo is not None) else None)
962 rr = 1
963 while (rr != 0):
964 rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn)
965 self._consume()
966 return kvo
967
969 """Maps on processes started by MPI_Comm_spawn()."""
970
971 (sopts, mopts) = _filter_spawn_options(xopts)
972 (keyty, valty, mkkvo) = _get_options(mopts, True)
973 cmopts = _c_option(mopts, _enabled_options_of_map)
974 csopts = _c_spawn_option(sopts)
975 cfn = _wrap_mapfn(fn)
976 ckvi = self._ckvs
977 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
978 ckvo = (kvo._ckvs if (kvo is not None) else None)
979 kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn)
980 self._consume()
981 return kvo
982
984 """Maps on processes started by MPI_Comm_spawn()."""
985
986 (keyty, valty, mkkvo) = _get_options(sopts, True)
987 csopts = _c_spawn_option(sopts)
988 cfn = _wrap_mapfn(fn)
989 ckvi = self._ckvs
990 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
991 ckvo = (kvo._ckvs if (kvo is not None) else None)
992 kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null,
993 csopts, cfn)
994 self._consume()
995 return kvo
996
998 """Maps on processes started by MPI_Comm_spawn()."""
999
1000 return self.map_processes(False, fn, **sopts)
1001
1003 """Maps on processes started by MPI_Comm_spawn()."""
1004
1005 return self.map_processes(True, fn, **sopts)
1006
1007 - def reduce(self, fn, **mopts):
1008 """Reduces key-value pairs."""
1009
1010 (keyty, valty, mkkvo) = _get_options(mopts, True)
1011 cmopts = _c_option(mopts, _enabled_options_of_reduce)
1012 cfn = _wrap_redfn(fn)
1013 ckvi = self._ckvs
1014 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1015 ckvo = (kvo._ckvs if (kvo is not None) else None)
1016 (f, l, n) = _make_frame_info(inspect.currentframe().f_back)
1017 kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1018 if (cmopts.inspect == 0): self._consume()
1019 return kvo
1020
1022 """ Reduces once as if all pairs had the same key."""
1023
1024 (keyty, valty, mkkvo) = _get_options(mopts, True)
1025 cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one)
1026 cfn = _wrap_redfn(fn)
1027 ckvi = self._ckvs
1028 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1029 ckvo = (kvo._ckvs if (kvo is not None) else None)
1030 kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn)
1031 if (cmopts.inspect == 0): self._consume()
1032 return kvo
1033
1035 """Reduces until some key-value are added."""
1036
1037 (keyty, valty, mkkvo) = _get_options(mopts, True)
1038 cmopts = _c_option(mopts, _enabled_options_of_reduce)
1039 cfn = _wrap_redfn(fn)
1040 ckvi = self._ckvs
1041 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1042 ckvo = (kvo._ckvs if (kvo is not None) else None)
1043
1044 (f, l, n) = _make_frame_info(inspect.currentframe())
1045 kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n))
1046 if (cmopts.inspect == 0): self._consume()
1047 return kvo
1048
1050 """Makes a new pair by swapping the key and the value."""
1051
1052 keyty = self.get_field_type("key")
1053 valty = self.get_field_type("value")
1054 (_, _, mkkvo) = _get_options(mopts, False)
1055 cmopts = _c_option(mopts, _enabled_options_of_map)
1056 assert (mkkvo is True)
1057 ckvi = self._ckvs
1058 kvo = (KVS(self.mr, valty, keyty) if mkkvo else None)
1059 ckvo = (kvo._ckvs if (kvo is not None) else None)
1060 kmrso.kmr_reverse(ckvi, ckvo, cmopts)
1061 if (cmopts.inspect == 0): self._consume()
1062 return kvo
1063
1065 """Shuffles key-value pairs."""
1066
1067 keyty = self.get_field_type("key")
1068 valty = self.get_field_type("value")
1069 (_, _, mkkvo) = _get_options(mopts, False)
1070 cmopts = _c_option(mopts, _enabled_options_of_reduce)
1071 ckvi = self._ckvs
1072 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1073 ckvo = (kvo._ckvs if (kvo is not None) else None)
1074 kmrso.kmr_shuffle(ckvi, ckvo, cmopts)
1075 if (cmopts.inspect == 0): self._consume()
1076 return kvo
1077
1079 """Replicates key-value pairs to be visible on all ranks."""
1080
1081 keyty = self.get_field_type("key")
1082 valty = self.get_field_type("value")
1083 (_, _, mkkvo) = _get_options(mopts, False)
1084 cmopts = _c_option(mopts, _enabled_options_of_shuffle)
1085 ckvi = self._ckvs
1086 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1087 ckvo = (kvo._ckvs if (kvo is not None) else None)
1088 kmrso.kmr_replicate(ckvi, ckvo, cmopts)
1089 if (cmopts.inspect == 0): self._consume()
1090 return kvo
1091
1093 """Distributes pairs approximately evenly to ranks."""
1094
1095 keyty = self.get_field_type("key")
1096 valty = self.get_field_type("value")
1097 (_, _, mkkvo) = _get_options(mopts, False)
1098 cmopts = _c_option(mopts, _enabled_options_of_distribute)
1099 ckvi = self._ckvs
1100 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1101 ckvo = (kvo._ckvs if (kvo is not None) else None)
1102 kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts)
1103 if (cmopts.inspect == 0): self._consume()
1104 return kvo
1105
1107 """Reorders key-value pairs in a single rank."""
1108
1109 keyty = self.get_field_type("key")
1110 valty = self.get_field_type("value")
1111 (_, _, mkkvo) = _get_options(mopts, False)
1112 cmopts = _c_option(mopts, _enabled_options_of_sort_locally)
1113 ckvi = self._ckvs
1114 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1115 ckvo = (kvo._ckvs if (kvo is not None) else None)
1116 kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts)
1117 if (cmopts.inspect == 0): self._consume()
1118 return kvo
1119
1120 - def sort(self, **mopts):
1121 """Sorts a KVS globally."""
1122
1123 keyty = self.get_field_type("key")
1124 valty = self.get_field_type("value")
1125 (_, _, mkkvo) = _get_options(mopts, False)
1126 cmopts = _c_option(mopts, _enabled_options_of_sort)
1127 ckvi = self._ckvs
1128 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None)
1129 ckvo = (kvo._ckvs if (kvo is not None) else None)
1130 kmrso.kmr_sort(ckvi, ckvo, cmopts)
1131 if (cmopts.inspect == 0): self._consume()
1132 return kvo
1133
1135 """Concatenates a number of KVS'es to one."""
1136
1137 keyty = self.get_field_type("key")
1138 valty = self.get_field_type("value")
1139 siz = (len(morekvs) + 1)
1140 ckvsvec = (_c_kvs * siz)()
1141 ckvsvec[0] = self._ckvs
1142 for i in range(0, len(morekvs)):
1143 ckvsvec[i + 1] = morekvs[i]._ckvs
1144 cn = _c_int(siz)
1145 kvo = KVS(self.mr, keyty, valty)
1146 ckvo = kvo._ckvs
1147 kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option())
1148 for i in morekvs:
1149 i._consume()
1150 self._consume()
1151 return kvo
1152
1154 """Reassembles files reading by ranks."""
1155
1156 buf = _c_void_p()
1157 siz = _c_uint64(0)
1158 kmrso.kmr_read_files_reassemble(
1159 self.mr._ckmr, filename, color, offset, bytes_,
1160 ctypes.byref(buf), ctypes.byref(siz))
1161 addr = buf.value
1162 ptr = (_c_ubyte * siz.value).from_address(addr)
1163 data = bytearray(ptr)
1164 kmrso.kmr_mfree(addr, siz.value)
1165 return data
1166
1168 """Reads one file by segments and reassembles."""
1169
1170 buf = _c_void_p()
1171 siz = _c_uint64(0)
1172 kmrso.kmr_read_file_by_segments(
1173 self.mr._ckmr, filename, color,
1174 ctypes.byref(buf), ctypes.byref(siz))
1175 addr = buf.value
1176 ptr = (_c_ubyte * siz.value).from_address(addr)
1177 data = bytearray(ptr)
1178 kmrso.kmr_mfree(addr, siz.value)
1179 return data
1180
1182 """Packs locally the contents of a KVS to a byte array."""
1183
1184 buf = _c_void_p(0)
1185 siz = _c_size_t(0)
1186 kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz),
1187 _c_option())
1188 addr = buf.value
1189 ptr = (_c_ubyte * siz.value).from_address(addr)
1190 data = bytearray(ptr)
1191 kmrso.kmr_mfree(addr, siz.value)
1192 return data
1193
1195 """Unpacks locally the contents of a KVS from a byte array."""
1196
1197 kvo = KVS(self.mr, "opaque", "opaque")
1198 siz = len(data)
1199 addr = (_c_ubyte * siz).from_buffer(data)
1200 kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option())
1201 return kvo
1202
1204 """Finishes using KMR4PY."""
1205
1206 kmrso.kmr_fin()
1207 return
1208
1210 """Returns an array of LOCAL contents."""
1211
1212 a = kvs.local_element_count() * [None]
1213 def f (kv, kvi, kvo, i, *_data):
1214 a[i] = kv
1215 return 0
1216 kvo = kvs.map(f, output=False, inspect=True)
1217 assert (kvo is None)
1218 return a
1219
1221 """Checks if ctypes values are properly used."""
1222
1223 if (not _c_null_pointer(_c_null_pointer_value)):
1224 raise Exception("BAD: C null pointer has a wrong value.")
1225
1227 """Checks if the options are passed properly from Python to C."""
1228
1229 for (option, stringify) in [
1230 (_c_option, kmrso.kmr_stringify_options),
1231 (_c_file_option, kmrso.kmr_stringify_file_options),
1232 (_c_spawn_option, kmrso.kmr_stringify_spawn_options)]:
1233 for (o, _, _) in option._fields_:
1234 if ((o == "gap16") or (o == "gap32")):
1235 pass
1236 else:
1237 copts = option({o : 1})
1238 s = stringify(copts)
1239 if (o != s):
1240 raise Exception("BAD: %s != %s" % (str(o), str(s)))
1241
1242
1243
1244
1245