Module kmr4py

Source Code for Module kmr4py

   1  # kmr4py.py 
   2  # Copyright (C) 2012-2016 RIKEN AICS 
   3   
   4  """Python Binding for KMR Map-Reduce Library.  This provides 
   5  straightforward wrappers to the C routines.  See more abort KMR at 
   6  "http://mt.aics.riken.jp/kmr".  All key-value data is stored in C 
   7  structures after encoding/decoding Python objects to byte arrays in C. 
   8  Documentation in Python is minimum, so please refer to the 
   9  documentation in C.""" 
  10   
  11  ## NOTE: Importing mpi4py initializes for MPI execution.  It is not 
  12  ## imported here, applications shall import it. 
  13   
  14  import warnings 
  15  import ctypes 
  16  import cPickle 
  17  import inspect 
  18  import traceback 
  19  import sys 
  20   
  21  __version__ = "20160425" 
  22   
  23  kmrso = ctypes.CDLL("libkmr.so") 
  24   
  25  """kmrso holds a libkmr.so library object.""" 
  26   
  27  _kmrso_version = ctypes.c_int.in_dll(kmrso, "kmr_version").value 
  28  if (__version__ != str(_kmrso_version)): 
  29      warnings.warn(("Version unmatch with libkmr.so;" 
  30                     + " found=" + str(_kmrso_version) 
  31                     + " required=" + __version__), 
  32                    RuntimeWarning) 
  33   
  34  #cpickle_protocol = cPickle.HIGHEST_PROTOCOL 
  35  cpickle_protocol = 0 
  36   
  37  """cpickle_protocol specifies a protocol used in encoding/decoding 
  38  key-value fields.  NOTE: The highest protocol of cpickle (value 2) is 
  39  avoided because it fails to encode/decode integer zero in 
  40  python-2.7.10, gcc-4.8.2, x86-64.""" 
  41   
  42  warning_function = warnings.warn 
  43   
  44  """warning_function specifies the function used to issue warnings.""" 
  45   
  46  ignore_exceptions_in_map_fn = True 
  47   
  48  """ignore_exceptions_in_map_fn=True makes exceptions ignored.""" 
  49   
  50  print_backtrace_in_map_fn = True 
  51   
  52  """print_backtrace_in_map_fn=True makes backtraces are printed at 
  53  exceptions in mapper/reducer functions.""" 
  54   
  55  force_null_terminate_in_cstring = True 
  56   
  57  """force_null_terminate_in_cstring specifies to add a null-terminator 
  58  in C strings.  Do not change it while some KVS'es are live.""" 
  59   
  60  kmrso.kmr_init_2.argtypes = [ctypes.c_int] 
  61  kmrso.kmr_init_2.restype = ctypes.c_int 
  62   
  63  ## Initializes KMR at this point. 
  64   
  65  kmrso.kmr_init_2(0) 
  66   
  67  _c_pointer = ctypes.POINTER(ctypes.c_char) 
  68  _c_kmr = _c_pointer 
  69  _c_kvs = _c_pointer 
  70  _c_kvsvec = ctypes.c_void_p 
  71  _c_boxvec = ctypes.c_void_p 
  72  _c_fnp = ctypes.c_void_p 
  73  _c_void_p = ctypes.c_void_p 
  74  _c_ubyte = ctypes.c_ubyte 
  75  _c_bool = ctypes.c_bool 
  76  _c_int = ctypes.c_int 
  77  _c_uint = ctypes.c_uint 
  78  _c_long = ctypes.c_long 
  79  _c_uint8 = ctypes.c_uint8 
  80  _c_uint32 = ctypes.c_uint32 
  81  _c_uint64 = ctypes.c_uint64 
  82  _c_double = ctypes.c_double 
  83  _c_size_t = ctypes.c_size_t 
  84  _c_string = ctypes.c_char_p 
  85   
  86  ## _c_funcptr is ctypes._FuncPtr, but it is taken indirectly 
  87  ## because it is hidden. 
  88   
  89  _c_funcptr = type(kmrso.kmr_init_2) 
  90   
  91  ## Null return values for ctypes.restype. 
  92   
  93  _c_null_pointer_value = _c_pointer() 
  94   
95 -def _c_null_pointer(p):
96 """Returns true if ctypes pointer is null.""" 97 98 return (not bool(p))
99
100 -def _string_of_options(o):
101 """Returns a print string of options for _c_option, 102 _c_file_option, and _c_spawn_option.""" 103 104 prefix = o.__class__.__name__ 105 attrs = o.__class__._fields_ 106 ss = [] 107 for (f, _, _) in attrs: 108 if ((f not in ["gap16", "gap32"]) and getattr(o, f) == 1): 109 ss.append(f + "=1") 110 return (prefix + "(" + (",".join(ss)) + ")")
111
112 -class _c_option(ctypes.Structure):
113 """kmr_option.""" 114 115 _fields_ = [ 116 ("nothreading", _c_uint8, 1), 117 ("inspect", _c_uint8, 1), 118 ("keep_open", _c_uint8, 1), 119 ("key_as_rank", _c_uint8, 1), 120 ("rank_zero", _c_uint8, 1), 121 ("collapse", _c_uint8, 1), 122 ("take_ckpt", _c_uint8, 1), 123 ("gap16", _c_uint, 16), 124 ("gap32", _c_uint, 32)] 125
126 - def __init__(self, opts=None, enabledlist=None):
127 super(_c_option, self).__init__() 128 if opts is None: opts = {} 129 if enabledlist is None: enabledlist = [] 130 ## Sets the options as dictionary passed. 131 for o, v in opts.iteritems(): 132 if (o in ["key", "value", "output"]): 133 ## "key", "value", and "output" are Python binding only. 134 pass 135 elif (enabledlist != [] and (o not in enabledlist)): 136 raise Exception("Bad option: %s" % o) 137 elif (o == "nothreading"): 138 self.nothreading = v 139 elif (o == "inspect"): 140 self.inspect = v 141 elif (o == "keep_open"): 142 self.keep_open = v 143 elif (o == "key_as_rank"): 144 self.key_as_rank = v 145 elif (o == "rank_zero"): 146 self.rank_zero = v 147 elif (o == "collapse"): 148 self.collapse = v 149 elif (o == "take_ckpt"): 150 self.take_ckpt = v 151 else: 152 raise Exception("Bad option: %s" % o) 153 return
154
155 - def __str__(self):
156 return _string_of_options(self)
157
158 -class _c_file_option(ctypes.Structure):
159 """kmr_file_option.""" 160 161 _fields_ = [ 162 ("each_rank", _c_uint8, 1), 163 ("subdirectories", _c_uint8, 1), 164 ("list_file", _c_uint8, 1), 165 ("shuffle_names", _c_uint8, 1), 166 ("gap16", _c_uint, 16), 167 ("gap32", _c_uint, 32)] 168
169 - def __init__(self, opts=None, enabledlist=None):
170 super(_c_file_option, self).__init__() 171 if opts is None: opts = {} 172 if enabledlist is None: enabledlist = [] 173 ## Sets the options as dictionary passed. 174 for o, v in opts.iteritems(): 175 if (o == "key" or o == "output"): 176 ## "key" and "output" are Python binding only. 177 pass 178 elif (enabledlist != [] and (o not in enabledlist)): 179 raise Exception("Bad option: %s" % o) 180 elif (o == "each_rank"): 181 self.each_rank = v 182 elif (o == "subdirectories"): 183 self.subdirectories = v 184 elif (o == "list_file"): 185 self.list_file = v 186 elif (o == "shuffle_names"): 187 self.shuffle_names = v 188 else: 189 raise Exception("Bad option: %s" % o) 190 return
191
192 - def __str__(self):
193 return _string_of_options(self)
194
195 -class _c_spawn_option(ctypes.Structure):
196 """kmr_spawn_option.""" 197 198 _fields_ = [ 199 ("separator_space", _c_uint8, 1), 200 ("reply_each", _c_uint8, 1), 201 ("reply_root", _c_uint8, 1), 202 ("one_by_one", _c_uint8, 1), 203 ("take_ckpt", _c_uint8, 1), 204 ("gap16", _c_uint, 16), 205 ("gap32", _c_uint, 32)] 206
207 - def __init__(self, opts=None, enabledlist=None):
208 super(_c_spawn_option, self).__init__() 209 if opts is None: opts = {} 210 if enabledlist is None: enabledlist = [] 211 ## Sets the options as dictionary passed. 212 for o, v in opts.iteritems(): 213 if (o == "key" or o == "output"): 214 ## "key" and "output" are Python binding only. 215 pass 216 elif (enabledlist != [] and (o not in enabledlist)): 217 raise Exception("Bad option: %s" % o) 218 elif (o == "separator_space"): 219 self.separator_space = v 220 elif (o == "reply_each"): 221 self.reply_each = v 222 elif (o == "reply_root"): 223 self.reply_root = v 224 elif (o == "one_by_one"): 225 self.one_by_one = v 226 elif (o == "take_ckpt"): 227 self.take_ckpt = v 228 else: 229 raise Exception("Bad option: %s" % o) 230 return
231
232 - def __str__(self):
233 return _string_of_options(self)
234
235 -class _c_unitsized(ctypes.Union):
236 """kmr_unit_sized {const char *p; long i; double d;}.""" 237 238 _fields_ = [ 239 ("p", _c_string), 240 ("i", _c_long), 241 ("d", _c_double)]
242
243 -class _c_kvbox(ctypes.Structure):
244 """kmr_kv_box {int klen, vlen; kmr_unit_sized k, v;}.""" 245 246 _fields_ = [ 247 ("klen", _c_int), 248 ("vlen", _c_int), 249 ("k", _c_unitsized), 250 ("v", _c_unitsized)] 251 252 ## NOTE: Defining __init__ with some arguments makes c-callback 253 ## fail to call initializers. 254
255 - def __init__(self):
256 super(_c_kvbox, self).__init__()
257
258 - def set(self, klen, key, vlen, val):
259 self.klen = klen 260 self.vlen = vlen 261 self.k = key 262 self.v = val 263 return self
264 265 kmrso.kmr_mpi_type_size.argtypes = [_c_string] 266 kmrso.kmr_mpi_type_size.restype = _c_size_t 267 268 kmrso.kmr_mpi_constant_value.argtypes = [_c_string] 269 kmrso.kmr_mpi_constant_value.restype = _c_uint64 270 271 ## Import some MPI constant values. Calling kmr_mpi_type_size and 272 ## kmr_mpi_constant_value dose not need MPI be initialized. 273
274 -def _setup_mpi_constants():
275 def c_type_by_size(siz): 276 if (siz == ctypes.sizeof(_c_uint64)): 277 return _c_uint64 278 elif (siz == ctypes.sizeof(_c_uint32)): 279 return _c_uint32 280 else: 281 raise Exception("Bad type size unknown: %d" % siz) 282 return None
283 global _c_mpi_comm, _c_mpi_info 284 global _mpi_comm_world, _mpi_comm_self, _mpi_info_null 285 siz = kmrso.kmr_mpi_type_size("MPI_Comm") 286 _c_mpi_comm = c_type_by_size(siz) 287 siz = kmrso.kmr_mpi_type_size("MPI_Info") 288 _c_mpi_info = c_type_by_size(siz) 289 _mpi_comm_world = kmrso.kmr_mpi_constant_value("MPI_COMM_WORLD") 290 _mpi_comm_self = kmrso.kmr_mpi_constant_value("MPI_COMM_SELF") 291 _mpi_info_null = kmrso.kmr_mpi_constant_value("MPI_INFO_NULL") 292 return 293 294 _setup_mpi_constants() 295 296 kmrso.kmr_fin.argtypes = [] 297 kmrso.kmr_fin.restype = _c_int 298 299 kmrso.kmr_create_context.argtypes = [_c_mpi_comm, _c_mpi_info, _c_string] 300 kmrso.kmr_create_context.restype = _c_pointer 301 302 kmrso.kmr_create_dummy_context.argtypes = [] 303 kmrso.kmr_create_dummy_context.restype = _c_pointer 304 305 kmrso.kmr_free_context.argtypes = [_c_kmr] 306 kmrso.kmr_free_context.restype = None 307 308 kmrso.kmr_set_option_by_strings.argtypes = [_c_kmr, _c_string, _c_string] 309 kmrso.kmr_set_option_by_strings.restype = None 310 311 kmrso.kmr_create_kvs7.argtypes = [ 312 _c_kmr, _c_int, _c_int, _c_option, _c_string, _c_int, _c_string] 313 kmrso.kmr_create_kvs7.restype = _c_kvs 314 315 kmrso.kmr_add_kv.argtypes = [_c_kvs, _c_kvbox] 316 kmrso.kmr_add_kv.restype = None 317 318 kmrso.kmr_add_kv_done.argtypes = [_c_kvs] 319 kmrso.kmr_add_kv_done.restype = None 320 321 kmrso.kmr_get_element_count.argtypes = [_c_kvs] 322 kmrso.kmr_get_element_count.restype = _c_long 323 324 kmrso.kmr_local_element_count.argtypes = [_c_kvs] 325 kmrso.kmr_local_element_count.restype = _c_long 326 327 kmrso.kmr_map9.argtypes = [ 328 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp, 329 _c_string, _c_int, _c_string] 330 kmrso.kmr_map9.restype = None 331 332 kmrso.kmr_map_once.argtypes = [_c_kvs, _c_void_p, _c_option, _c_bool, _c_fnp] 333 kmrso.kmr_map_once.restype = None 334 335 kmrso.kmr_map_rank_by_rank.argtypes = [ 336 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp] 337 kmrso.kmr_map_rank_by_rank.restype = None 338 339 kmrso.kmr_map_for_some.argtypes = [ 340 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp] 341 kmrso.kmr_map_for_some.restype = None 342 343 kmrso.kmr_map_ms.argtypes = [_c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp] 344 kmrso.kmr_map_ms.restype = _c_int 345 346 kmrso.kmr_map_ms_commands.argtypes = [ 347 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_spawn_option, _c_fnp] 348 kmrso.kmr_map_ms_commands.restype = _c_int 349 350 kmrso.kmr_map_via_spawn.argtypes = [ 351 _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, _c_spawn_option, _c_fnp] 352 kmrso.kmr_map_via_spawn.restype = None 353 354 kmrso.kmr_map_processes.argtypes = [ 355 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_mpi_info, _c_spawn_option, _c_fnp] 356 kmrso.kmr_map_processes.restype = None 357 358 kmrso.kmr_reduce9.argtypes = [ 359 _c_bool, _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp, 360 _c_string, _c_int, _c_string] 361 kmrso.kmr_reduce9.restype = None 362 363 kmrso.kmr_reduce_as_one.argtypes = [ 364 _c_kvs, _c_kvs, _c_void_p, _c_option, _c_fnp] 365 kmrso.kmr_reduce_as_one.type = None 366 367 kmrso.kmr_shuffle.argtypes = [_c_kvs, _c_kvs, _c_option] 368 kmrso.kmr_shuffle.restype = None 369 370 kmrso.kmr_replicate.argtypes = [_c_kvs, _c_kvs, _c_option] 371 kmrso.kmr_replicate.restype = None 372 373 kmrso.kmr_distribute.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option] 374 kmrso.kmr_distribute.restype = None 375 376 kmrso.kmr_concatenate_kvs.argtypes = [_c_kvsvec, _c_int, _c_kvs, _c_option] 377 kmrso.kmr_concatenate_kvs.restype = None 378 379 kmrso.kmr_reverse.argtypes = [_c_kvs, _c_kvs, _c_option] 380 kmrso.kmr_reverse.restype = None 381 382 kmrso.kmr_sort.argtypes = [_c_kvs, _c_kvs, _c_option] 383 kmrso.kmr_sort.restype = None 384 385 kmrso.kmr_sort_locally.argtypes = [_c_kvs, _c_kvs, _c_bool, _c_option] 386 kmrso.kmr_sort_locally.restype = None 387 388 kmrso.kmr_reply_to_spawner.argtypes = [_c_kmr] 389 kmrso.kmr_reply_to_spawner.restype = None 390 391 kmrso.kmr_send_kvs_to_spawner.argtypes = [_c_kmr, _c_kvs] 392 kmrso.kmr_send_kvs_to_spawner.restype = None 393 394 kmrso.kmr_get_spawner_communicator.argtypes = [_c_void_p, _c_long] 395 kmrso.kmr_get_spawner_communicator.restype = _c_mpi_comm 396 397 kmrso.kmr_read_files_reassemble.argtypes = [ 398 _c_kmr, _c_string, _c_int, _c_uint64, _c_uint64, 399 ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)] 400 kmrso.kmr_read_files_reassemble.restype = None 401 402 kmrso.kmr_read_file_by_segments.argtypes = [ 403 _c_kmr, _c_string, _c_int, 404 ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_uint64)] 405 kmrso.kmr_read_file_by_segments.restype = None 406 407 kmrso.kmr_save_kvs.argtypes = [ 408 _c_kvs, ctypes.POINTER(_c_void_p), ctypes.POINTER(_c_size_t), _c_option] 409 kmrso.kmr_save_kvs.restype = None 410 411 kmrso.kmr_restore_kvs.argtypes = [ 412 _c_kvs, _c_void_p, _c_size_t, _c_option] 413 kmrso.kmr_restore_kvs.restype = None 414 415 kmrso.kmr_dump_kvs.argtypes = [_c_kvs, _c_int] 416 kmrso.kmr_dump_kvs.restype = None 417 418 kmrso.kmr_get_key_type_ff.argtypes = [_c_kvs] 419 kmrso.kmr_get_key_type_ff.restype = _c_int 420 421 kmrso.kmr_get_value_type_ff.argtypes = [_c_kvs] 422 kmrso.kmr_get_value_type_ff.restype = _c_int 423 424 kmrso.kmr_get_nprocs.argtypes = [_c_kmr] 425 kmrso.kmr_get_nprocs.restype = _c_int 426 427 kmrso.kmr_get_rank.argtypes = [_c_kmr] 428 kmrso.kmr_get_rank.restype = _c_int 429 430 kmrso.kmr_mfree.argtypes = [_c_void_p, _c_size_t] 431 kmrso.kmr_mfree.restype = None 432 433 kmrso.kmr_stringify_options.argtypes = [_c_option] 434 kmrso.kmr_stringify_options.restype = _c_string 435 436 kmrso.kmr_stringify_file_options.argtypes = [_c_file_option] 437 kmrso.kmr_stringify_file_options.restype = _c_string 438 439 kmrso.kmr_stringify_spawn_options.argtypes = [_c_spawn_option] 440 kmrso.kmr_stringify_spawn_options.restype = _c_string 441 442 #receive_kvs_from_spawned_fn = kmrso.kmr_receive_kvs_from_spawned_fn 443 444 _kv_bad = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_bad").value 445 _kv_opaque = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_opaque").value 446 _kv_cstring = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_cstring").value 447 _kv_integer = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_integer").value 448 _kv_float8 = ctypes.c_int.in_dll(kmrso, "kmr_kv_field_float8").value 449 450 _spawn_option_list = [_k for (_k, _, _) in _c_spawn_option._fields_] 451 _file_option_list = [_k for (_k, _, _) in _c_file_option._fields_] 452 453 _field_name_type_map = { 454 "opaque" : _kv_opaque, "cstring" : _kv_cstring, 455 "integer" : _kv_integer, "float8" : _kv_float8} 456 457 _field_type_name_map = dict( 458 map(tuple, map(reversed, _field_name_type_map.items()))) 459 460 ## C-callable function factories. 461 462 _MKMAPFN = ctypes.CFUNCTYPE(_c_int, _c_kvbox, _c_kvs, _c_kvs, 463 _c_void_p, _c_long) 464 _MKREDFN = ctypes.CFUNCTYPE(_c_int, _c_boxvec, _c_long, 465 _c_kvs, _c_kvs, _c_void_p) 466 467 ## Argtypes of C callback map/reduce functions. 468 469 _c_mapfn_argtypes = [_c_kvbox, _c_kvs, _c_kvs, _c_void_p, _c_long] 470 _c_mapfn_restype = _c_int 471 472 _c_redfn_argtypes = [_c_boxvec, _c_long, _c_kvs, _c_kvs, _c_void_p] 473 _c_redfn_restype = _c_int 474
475 -def _wrap_mapfn(pyfn):
476 """Returns a closure which calls a given Python map-function on 477 the unmarshalled contents in KVS.""" 478 479 if (pyfn is None): 480 return 0 481 elif (isinstance(pyfn, _c_funcptr)): 482 return pyfn 483 else: 484 def applyfn(cbox, ckvi, ckvo, carg, cindex): 485 kvi = KVS(ckvi) 486 kvo = KVS(ckvo) 487 key = kvi._decode_content(cbox.klen, cbox.k, "key") 488 val = kvi._decode_content(cbox.vlen, cbox.v, "value") 489 try: 490 pyfn((key, val), kvi, kvo, cindex) 491 except: 492 warning_function(("Exception in Python callbacks: %s" 493 % str(sys.exc_info()[1])), 494 RuntimeWarning) 495 if (print_backtrace_in_map_fn): traceback.print_exc() 496 return (0 if ignore_exceptions_in_map_fn else -1)
497 return _MKMAPFN(applyfn) 498
499 -def _wrap_redfn(pyfn):
500 """Returns a closure which calls a given Python reduce-function on 501 the unmarshalled contents in KVS.""" 502 503 if (pyfn is None): 504 return 0 505 elif (isinstance(pyfn, _c_funcptr)): 506 return pyfn 507 else: 508 def applyfn(cboxvec, n, ckvi, ckvo, carg): 509 kvi = KVS(ckvi) 510 kvo = KVS(ckvo) 511 kvvec = [] 512 for i in range(0, n): 513 pos = (cboxvec + ctypes.sizeof(_c_kvbox) * i) 514 cbox = _c_kvbox.from_address(pos) 515 key = kvi._decode_content(cbox.klen, cbox.k, "key") 516 val = kvi._decode_content(cbox.vlen, cbox.v, "value") 517 kvvec.append((key, val)) 518 try: 519 pyfn(kvvec, kvi, kvo) 520 except: 521 warning_function(("Exception in Python callbacks: %s" 522 % str(sys.exc_info()[1])), 523 RuntimeWarning) 524 if (print_backtrace_in_map_fn): traceback.print_exc() 525 return (0 if ignore_exceptions_in_map_fn else -1)
526 return _MKREDFN(applyfn) 527
528 -def _get_options(opts, with_keyty_valty):
529 """Returns a triple of the options: a key field type, a value 530 field type, and a flag of needs of output generation.""" 531 532 if ((not with_keyty_valty) and (("key" in opts) or ("value" in opts))): 533 raise Exception("Bad option: key= or value= not allowed") 534 keyty = opts.get("key", "opaque") 535 valty = opts.get("value", "opaque") 536 mkkvo = opts.get("output", True) 537 return (keyty, valty, mkkvo)
538
539 -def _make_frame_info(frame):
540 sp = frame 541 co = sp.f_code 542 return (co.co_filename, sp.f_lineno, co.co_name)
543
544 -def _filter_spawn_options(opts):
545 """Returns a pair of dictionaries, the 1st holds options to spawn, 546 and the 2nd holds the other options.""" 547 548 sopts = dict() 549 mopts = dict() 550 for o, v in opts.iteritems(): 551 if (o in _spawn_option_list): 552 sopts[o] = v 553 else: 554 mopts[o] = v 555 return (sopts, mopts)
556
557 -class KMR():
558 """KMR context.""" 559 560 ## attributes: self._ckmr, self.nprocs, self.rank, self.emptykvs 561 ## self._dismissed 562
563 - def __init__(self, comm, info=None):
564 """Makes a KMR context with a given MPI communicator (comm), 565 which is used in succeeding operations. Info specifies its 566 options by MPI_Info. Arguments of comm/info are passed as a 567 long integer (assuming either an integer (int) or a pointer in 568 C). It also accepts a string "dummy" or "world" as a comm 569 argument.""" 570 571 if (isinstance(info, (int, long))): 572 warninfo = False 573 cinfo = info 574 else: 575 warninfo = (info != None) 576 cinfo = _mpi_info_null 577 if (isinstance(comm, (int, long))): 578 warncomm = False 579 ccomm = comm 580 elif (comm == "dummy"): 581 warncomm = False 582 ccomm = _mpi_comm_self 583 elif (comm == "world"): 584 warncomm = False 585 ccomm = _mpi_comm_world 586 else: 587 warncomm = True 588 ccomm = _mpi_comm_world 589 590 self._ckmr = kmrso.kmr_create_context(ccomm, cinfo, "") 591 592 """self._ckmr holds the C part of a KMR context.""" 593 594 if (_c_null_pointer(self._ckmr)): 595 raise Exception("kmr_create_context: failed") 596 597 self._dismissed = False 598 599 """self._dismissed=True disables freeing KVS'es (by memory 600 management) which remain unconsumed after dismissing a KMR 601 context. It is because freeing them causes referencing 602 dangling pointers in C.""" 603 604 self.emptykvs = KVS(self).free() 605 606 """self.emptykvs holds an empty KVS needed by map_once, 607 map_on_rank_zero, read_files_reassemble, and 608 read_file_by_segments.""" 609 610 self.nprocs = kmrso.kmr_get_nprocs(self._ckmr) 611 612 """self.nprocs holds an nprocs of MPI.""" 613 614 self.rank = kmrso.kmr_get_rank(self._ckmr) 615 616 """self.rank holds a rank of MPI.""" 617 618 if (warncomm and (self.rank == 0)): 619 warning_function("MPI comm ignored in KMR() constructor.", RuntimeWarning) 620 if (warninfo and (self.rank == 0)): 621 warning_function("MPI info ignored in KMR() constructor.", RuntimeWarning) 622 return
623
624 - def __del__(self):
625 self.dismiss() 626 return
627
628 - def free(self):
629 """Dismisses KMR (an alias of dismiss()).""" 630 631 self.dismiss()
632
633 - def dismiss(self):
634 """Dismisses KMR.""" 635 636 if (not _c_null_pointer(self._ckmr)): 637 kmrso.kmr_free_context(self._ckmr) 638 self._ckmr = _c_null_pointer_value 639 self._dismissed = True 640 self.emptykvs = None 641 self.nprocs = -1 642 self.rank = -1 643 return
644
645 - def create_kvs(self, **opts):
646 """Makes a new KVS (an alias of make_kvs()).""" 647 648 self.make_kvs(**opts)
649
650 - def make_kvs(self, **opts):
651 """Makes a new KVS.""" 652 653 (keyty, valty, _) = _get_options(opts, True) 654 return KVS(self, keyty, valty)
655
656 - def reply_to_spawner(self):
657 """Sends a reply message from a spawned process.""" 658 659 kmrso.kmr_reply_to_spawner(self._ckmr) 660 return
661
662 - def get_spawner_communicator(self, index):
663 """Obtains a parent communicator of a spawned process.""" 664 665 return kmrso.kmr_get_spawner_communicator(self._ckmr, index)
666
667 - def send_kvs_to_spawner(self, kvs):
668 """Sends the KVS from a spawned process to the spawner.""" 669 670 return kmrso.kmr_send_kvs_to_spawner(self._ckmr, kvs._ckvs)
671
672 - def set_option(self, k, v):
673 """Sets KMR option, taking both arguments by strings.""" 674 675 kmrso.kmr_set_option_by_strings(self._ckmr, k, v) 676 return
677 678 _enabled_options_of_map = [ 679 "nothreading", "inspect", "keep_open", "take_ckpt"] 680 681 _enabled_options_of_map_once = [ 682 "nothreading", "keep_open", "take_ckpt"] 683 684 _enabled_options_of_map_ms = [ 685 "nothreading", "keep_open"] 686 687 _enabled_options_of_reduce = [ 688 "nothreading", "inspect", "take_ckpt"] 689 690 _enabled_options_of_reduce_as_one = [ 691 "inspect", "take_ckpt"] 692 693 _enabled_options_of_shuffle = [ 694 "inspect", "rank_zero", "take_ckpt"] 695 696 _enabled_options_of_distribute = [ 697 "nothreading", "inspect", "keep_open"] 698 699 _enabled_options_of_sort_locally = [ 700 "nothreading", "inspect", "key_as_rank"] 701 702 _enabled_options_of_sort = [ 703 "inspect"] 704
705 -class KVS():
706 """KVS. Note that there are dummy KVS'es which are temporarily 707 created to hold the C structure of the KVS passed to 708 mapper/reducer functions. A dummy KVS has None in its "mr" 709 attribute.""" 710 711 ## attributes: self._ckvs, self.mr 712
713 - def __init__(self, kmr_or_ckvs, keyty="opaque", valty="opaque"):
714 """Makes a KVS for a given KMR. A KVS is created by 715 specifying the datatypes stored in the key and the value, 716 using the keywords "key=" and "value=". The datatype name is 717 a string, one of "opaque", "cstring", "integer", and "float8". 718 Thus, most mappers and reducers (precisely, the ones which 719 accepts a function argument) take keyword arguments with the 720 defaults key="opaque" and value="opaque". The datatypes 721 affects the sorting order. Do not call constructors directly, 722 but via KMR.make_kvs().""" 723 724 self.mr = None 725 726 """mr attribute holds a KMR context object. Note that mr is 727 not accessible from mapping/reducing functions.""" 728 729 if isinstance(kmr_or_ckvs, KMR): 730 kf = _field_name_type_map[keyty] 731 vf = _field_name_type_map[valty] 732 top = inspect.currentframe().f_back 733 self.mr = kmr_or_ckvs 734 (f, l, n) = _make_frame_info(top) 735 self._ckvs = kmrso.kmr_create_kvs7( 736 self.mr._ckmr, kf, vf, _c_option(), f, l, n) 737 elif isinstance(kmr_or_ckvs, _c_pointer): 738 ## Return a dummy KVS. 739 self.mr = None 740 self._ckvs = kmr_or_ckvs 741 else: 742 raise Exception("Bad call to kvs constructor")
743
744 - def __del__(self):
745 if ((not self._is_dummy()) and (not _c_null_pointer(self._ckvs))): 746 self.free() 747 return
748
749 - def free(self):
750 """Finishes the C part of a KVS.""" 751 752 if (self._is_dummy()): 753 raise Exception("Bad call to free_kvs on dummy KVS") 754 elif (_c_null_pointer(self._ckvs)): 755 raise Exception("Bad call to free_kvs on freed KVS") 756 elif ((not self.mr is None) and self.mr._dismissed): 757 ## Do not free when KMR object is dismissed. 758 pass 759 else: 760 kmrso.kmr_free_kvs(self._ckvs) 761 self._ckvs = _c_null_pointer_value 762 return self
763
764 - def _is_dummy(self):
765 return (self.mr is None)
766
767 - def _consume(self):
768 """Releases a now dangling C pointer.""" 769 770 self._ckvs = _c_null_pointer_value
771
772 - def _encode_content(self, o, key_or_value):
773 """Marshalls an object with regard to the field type. It 774 retuns a 3-tuple, with length, value-union, and the 3nd to 775 keep a reference to a buffer.""" 776 777 kvty = self.get_field_type(key_or_value) 778 u = _c_unitsized() 779 if (kvty == "opaque"): 780 data = cPickle.dumps(o, cpickle_protocol) 781 u.p = data 782 return (len(data), u, data) 783 elif (kvty == "cstring"): 784 if (not isinstance(o, str)): 785 raise Exception("Not 8-bit string for cstring: %s" % o) 786 ## (Add null for C string). 787 data = ((o + "\0") if force_null_terminate_in_cstring else o) 788 u.p = data 789 return (len(data), u, data) 790 elif (kvty == "integer"): 791 u.i = o 792 return (ctypes.sizeof(_c_long), u, None) 793 elif (kvty == "float8"): 794 u.d = o 795 return (ctypes.sizeof(_c_double), u, None) 796 else: 797 raise Exception("Bad field type: %s" % kvty)
798
799 - def _decode_content(self, siz, u, key_or_value):
800 """Unmarshalls an object with regard to the field type. It 801 returns integer 0 when the length is 0 (it is for a dummy 802 key-value used in kmr_map_once() etc).""" 803 804 if (siz == 0): 805 return 0 806 else: 807 kvty = self.get_field_type(key_or_value) 808 if (kvty == "opaque"): 809 s = ctypes.string_at(u.p, siz) 810 o = cPickle.loads(s) 811 return o 812 elif (kvty == "cstring"): 813 ## (Delete null added for C string). 814 siz1 = ((siz - 1) if force_null_terminate_in_cstring else siz) 815 s = ctypes.string_at(u.p, siz1) 816 return s 817 elif (kvty == "integer"): 818 return u.i 819 elif (kvty == "float8"): 820 return u.d 821 else: 822 raise Exception("Bad field type: %s" % kvty)
823
824 - def get_field_type(self, key_or_value):
825 """Get a field type of a KVS.""" 826 827 if (_c_null_pointer(self._ckvs)): 828 raise Exception("Bad KVS (null C-object)") 829 if (key_or_value == "key"): 830 kvty = kmrso.kmr_get_key_type_ff(self._ckvs) 831 elif (key_or_value == "value"): 832 kvty = kmrso.kmr_get_value_type_ff(self._ckvs) 833 else: 834 raise Exception("Bad field %s" % key_or_value) 835 if (kvty == _kv_bad): 836 raise Exception("Bad field type value %d in KVS" % kvty) 837 else: 838 return _field_type_name_map[kvty]
839
840 - def add(self, key, val):
841 """Adds a key-value pair.""" 842 843 self.add_kv(key, val) 844 return
845
846 - def add_kv(self, key, val):
847 """Adds a key-value pair.""" 848 849 ## Note it keeps the created string until kmr_add_kv(), 850 ## because kvbox does not hold the references. 851 (klen, k, ks) = self._encode_content(key, "key") 852 (vlen, v, vs) = self._encode_content(val, "value") 853 cbox = _c_kvbox().set(klen, k, vlen, v) 854 kmrso.kmr_add_kv(self._ckvs, cbox) 855 return
856
857 - def add_kv_done(self):
858 """Finishes adding key-value pairs.""" 859 860 kmrso.kmr_add_kv_done(self._ckvs) 861 return
862
863 - def get_element_count(self):
864 """Gets the total number of key-value pairs.""" 865 866 c = _c_long(0) 867 kmrso.kmr_get_element_count(self._ckvs, ctypes.byref(c)) 868 return c.value
869
870 - def local_element_count(self):
871 """Gets the number of key-value pairs locally.""" 872 873 c = _c_long(0) 874 kmrso.kmr_local_element_count(self._ckvs, ctypes.byref(c)) 875 return c.value
876
877 - def map(self, fn, **mopts):
878 """Maps simply.""" 879 880 (keyty, valty, mkkvo) = _get_options(mopts, True) 881 cmopts = _c_option(mopts, _enabled_options_of_map) 882 cfn = _wrap_mapfn(fn) 883 ckvi = self._ckvs 884 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 885 ckvo = (kvo._ckvs if (kvo is not None) else None) 886 (f, l, n) = _make_frame_info(inspect.currentframe().f_back) 887 kmrso.kmr_map9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 888 if (cmopts.inspect == 0): self._consume() 889 return kvo
890
891 - def map_once(self, rank_zero_only, fn, **mopts):
892 """Maps once with a dummy key-value pair.""" 893 894 ## It needs dummy input; Never inspects. 895 (keyty, valty, mkkvo) = _get_options(mopts, True) 896 cmopts = _c_option(mopts, _enabled_options_of_map_once) 897 cfn = _wrap_mapfn(fn) 898 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 899 ckvo = (kvo._ckvs if (kvo is not None) else None) 900 kmrso.kmr_map_once(ckvo, 0, cmopts, rank_zero_only, cfn) 901 return kvo
902
903 - def map_on_rank_zero(self, fn, **mopts):
904 """Maps on rank0 only.""" 905 906 ## It needs dummy input. 907 return self.map_once(True, fn, *mopts)
908
909 - def map_rank_by_rank(self, fn, **mopts):
910 """Maps sequentially with rank by rank for debugging.""" 911 912 (keyty, valty, mkkvo) = _get_options(mopts, True) 913 cmopts = _c_option(mopts, _enabled_options_of_map) 914 cfn = _wrap_mapfn(fn) 915 ckvi = self._ckvs 916 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 917 ckvo = (kvo._ckvs if (kvo is not None) else None) 918 kmrso.kmr_map_rank_by_rank(ckvi, ckvo, 0, cmopts, cfn) 919 if (cmopts.inspect == 0): self._consume() 920 return kvo
921
922 - def map_for_some(self, fn, **mopts):
923 """Maps until some key-value are added.""" 924 925 (keyty, valty, mkkvo) = _get_options(mopts, True) 926 cmopts = _c_option(mopts, _enabled_options_of_map) 927 ckvi = self._ckvs 928 cfn = _wrap_mapfn(fn) 929 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 930 ckvo = (kvo._ckvs if (kvo is not None) else None) 931 kmrso.kmr_map_for_some(ckvi, ckvo, 0, cmopts, cfn) 932 if (cmopts.inspect == 0): self._consume() 933 return kvo
934
935 - def map_ms(self, fn, **mopts):
936 """Maps in master-slave mode.""" 937 938 ## Its call is repeated until True (assuming MPI_SUCCESS==0). 939 (keyty, valty, mkkvo) = _get_options(mopts, True) 940 cmopts = _c_option(mopts, _enabled_options_of_map_ms) 941 cfn = _wrap_mapfn(fn) 942 ckvi = self._ckvs 943 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 944 ckvo = (kvo._ckvs if (kvo is not None) else None) 945 rr = 1 946 while (rr != 0): 947 rr = kmrso.kmr_map_ms(ckvi, ckvo, 0, cmopts, cfn) 948 self._consume() 949 return kvo
950
951 - def map_ms_commands(self, fn, **xopts):
952 """Maps in master-slave mode, and runs serial commands.""" 953 954 (sopts, mopts) = _filter_spawn_options(xopts) 955 (keyty, valty, mkkvo) = _get_options(mopts, True) 956 cmopts = _c_option(mopts, _enabled_options_of_map_ms) 957 csopts = _c_spawn_option(sopts) 958 cfn = _wrap_mapfn(fn) 959 ckvi = self._ckvs 960 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 961 ckvo = (kvo._ckvs if (kvo is not None) else None) 962 rr = 1 963 while (rr != 0): 964 rr = kmrso.kmr_map_ms_commands(ckvi, ckvo, 0, cmopts, csopts, cfn) 965 self._consume() 966 return kvo
967
968 - def map_via_spawn(self, fn, **xopts):
969 """Maps on processes started by MPI_Comm_spawn().""" 970 971 (sopts, mopts) = _filter_spawn_options(xopts) 972 (keyty, valty, mkkvo) = _get_options(mopts, True) 973 cmopts = _c_option(mopts, _enabled_options_of_map) 974 csopts = _c_spawn_option(sopts) 975 cfn = _wrap_mapfn(fn) 976 ckvi = self._ckvs 977 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 978 ckvo = (kvo._ckvs if (kvo is not None) else None) 979 kmrso.kmr_map_via_spawn(ckvi, ckvo, 0, _mpi_info_null, csopts, cfn) 980 self._consume() 981 return kvo
982
983 - def map_processes(self, nonmpi, fn, **sopts):
984 """Maps on processes started by MPI_Comm_spawn().""" 985 986 (keyty, valty, mkkvo) = _get_options(sopts, True) 987 csopts = _c_spawn_option(sopts) 988 cfn = _wrap_mapfn(fn) 989 ckvi = self._ckvs 990 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 991 ckvo = (kvo._ckvs if (kvo is not None) else None) 992 kmrso.kmr_map_processes(nonmpi, ckvi, ckvo, 0, _mpi_info_null, 993 csopts, cfn) 994 self._consume() 995 return kvo
996
997 - def map_parallel_processes(self, fn, **sopts):
998 """Maps on processes started by MPI_Comm_spawn().""" 999 1000 return self.map_processes(False, fn, **sopts)
1001
1002 - def map_serial_processes(self, fn, **sopts):
1003 """Maps on processes started by MPI_Comm_spawn().""" 1004 1005 return self.map_processes(True, fn, **sopts)
1006
1007 - def reduce(self, fn, **mopts):
1008 """Reduces key-value pairs.""" 1009 1010 (keyty, valty, mkkvo) = _get_options(mopts, True) 1011 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1012 cfn = _wrap_redfn(fn) 1013 ckvi = self._ckvs 1014 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1015 ckvo = (kvo._ckvs if (kvo is not None) else None) 1016 (f, l, n) = _make_frame_info(inspect.currentframe().f_back) 1017 kmrso.kmr_reduce9(0, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1018 if (cmopts.inspect == 0): self._consume() 1019 return kvo
1020
1021 - def reduce_as_one(self, fn, **mopts):
1022 """ Reduces once as if all pairs had the same key.""" 1023 1024 (keyty, valty, mkkvo) = _get_options(mopts, True) 1025 cmopts = _c_option(mopts, _enabled_options_of_reduce_as_one) 1026 cfn = _wrap_redfn(fn) 1027 ckvi = self._ckvs 1028 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1029 ckvo = (kvo._ckvs if (kvo is not None) else None) 1030 kmrso.kmr_reduce_as_one(ckvi, ckvo, 0, cmopts, cfn) 1031 if (cmopts.inspect == 0): self._consume() 1032 return kvo
1033
1034 - def reduce_for_some(self, fn, **mopts):
1035 """Reduces until some key-value are added.""" 1036 1037 (keyty, valty, mkkvo) = _get_options(mopts, True) 1038 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1039 cfn = _wrap_redfn(fn) 1040 ckvi = self._ckvs 1041 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1042 ckvo = (kvo._ckvs if (kvo is not None) else None) 1043 ## (NOTE: It passes a frame of reduce_for_some.) 1044 (f, l, n) = _make_frame_info(inspect.currentframe()) 1045 kmrso.kmr_reduce9(1, ckvi, ckvo, 0, cmopts, cfn, *(f, l, n)) 1046 if (cmopts.inspect == 0): self._consume() 1047 return kvo
1048
1049 - def reverse(self, **mopts):
1050 """Makes a new pair by swapping the key and the value.""" 1051 1052 keyty = self.get_field_type("key") 1053 valty = self.get_field_type("value") 1054 (_, _, mkkvo) = _get_options(mopts, False) 1055 cmopts = _c_option(mopts, _enabled_options_of_map) 1056 assert (mkkvo is True) 1057 ckvi = self._ckvs 1058 kvo = (KVS(self.mr, valty, keyty) if mkkvo else None) 1059 ckvo = (kvo._ckvs if (kvo is not None) else None) 1060 kmrso.kmr_reverse(ckvi, ckvo, cmopts) 1061 if (cmopts.inspect == 0): self._consume() 1062 return kvo
1063
1064 - def shuffle(self, **mopts):
1065 """Shuffles key-value pairs.""" 1066 1067 keyty = self.get_field_type("key") 1068 valty = self.get_field_type("value") 1069 (_, _, mkkvo) = _get_options(mopts, False) 1070 cmopts = _c_option(mopts, _enabled_options_of_reduce) 1071 ckvi = self._ckvs 1072 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1073 ckvo = (kvo._ckvs if (kvo is not None) else None) 1074 kmrso.kmr_shuffle(ckvi, ckvo, cmopts) 1075 if (cmopts.inspect == 0): self._consume() 1076 return kvo
1077
1078 - def replicate(self, **mopts):
1079 """Replicates key-value pairs to be visible on all ranks.""" 1080 1081 keyty = self.get_field_type("key") 1082 valty = self.get_field_type("value") 1083 (_, _, mkkvo) = _get_options(mopts, False) 1084 cmopts = _c_option(mopts, _enabled_options_of_shuffle) 1085 ckvi = self._ckvs 1086 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1087 ckvo = (kvo._ckvs if (kvo is not None) else None) 1088 kmrso.kmr_replicate(ckvi, ckvo, cmopts) 1089 if (cmopts.inspect == 0): self._consume() 1090 return kvo
1091
1092 - def distribute(self, cyclic, **mopts):
1093 """Distributes pairs approximately evenly to ranks.""" 1094 1095 keyty = self.get_field_type("key") 1096 valty = self.get_field_type("value") 1097 (_, _, mkkvo) = _get_options(mopts, False) 1098 cmopts = _c_option(mopts, _enabled_options_of_distribute) 1099 ckvi = self._ckvs 1100 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1101 ckvo = (kvo._ckvs if (kvo is not None) else None) 1102 kmrso.kmr_distribute(ckvi, ckvo, cyclic, cmopts) 1103 if (cmopts.inspect == 0): self._consume() 1104 return kvo
1105
1106 - def sort_locally(self, shuffling, **mopts):
1107 """Reorders key-value pairs in a single rank.""" 1108 1109 keyty = self.get_field_type("key") 1110 valty = self.get_field_type("value") 1111 (_, _, mkkvo) = _get_options(mopts, False) 1112 cmopts = _c_option(mopts, _enabled_options_of_sort_locally) 1113 ckvi = self._ckvs 1114 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1115 ckvo = (kvo._ckvs if (kvo is not None) else None) 1116 kmrso.kmr_sort_locally(ckvi, ckvo, shuffling, cmopts) 1117 if (cmopts.inspect == 0): self._consume() 1118 return kvo
1119
1120 - def sort(self, **mopts):
1121 """Sorts a KVS globally.""" 1122 1123 keyty = self.get_field_type("key") 1124 valty = self.get_field_type("value") 1125 (_, _, mkkvo) = _get_options(mopts, False) 1126 cmopts = _c_option(mopts, _enabled_options_of_sort) 1127 ckvi = self._ckvs 1128 kvo = (KVS(self.mr, keyty, valty) if mkkvo else None) 1129 ckvo = (kvo._ckvs if (kvo is not None) else None) 1130 kmrso.kmr_sort(ckvi, ckvo, cmopts) 1131 if (cmopts.inspect == 0): self._consume() 1132 return kvo
1133
1134 - def concatenate(self, *morekvs):
1135 """Concatenates a number of KVS'es to one.""" 1136 1137 keyty = self.get_field_type("key") 1138 valty = self.get_field_type("value") 1139 siz = (len(morekvs) + 1) 1140 ckvsvec = (_c_kvs * siz)() 1141 ckvsvec[0] = self._ckvs 1142 for i in range(0, len(morekvs)): 1143 ckvsvec[i + 1] = morekvs[i]._ckvs 1144 cn = _c_int(siz) 1145 kvo = KVS(self.mr, keyty, valty) 1146 ckvo = kvo._ckvs 1147 kmrso.kmr_concatenate_kvs(ckvsvec, cn, ckvo, _c_option()) 1148 for i in morekvs: 1149 i._consume() 1150 self._consume() 1151 return kvo
1152
1153 - def read_files_reassemble(self, filename, color, offset, bytes_):
1154 """Reassembles files reading by ranks.""" 1155 1156 buf = _c_void_p() 1157 siz = _c_uint64(0) 1158 kmrso.kmr_read_files_reassemble( 1159 self.mr._ckmr, filename, color, offset, bytes_, 1160 ctypes.byref(buf), ctypes.byref(siz)) 1161 addr = buf.value 1162 ptr = (_c_ubyte * siz.value).from_address(addr) 1163 data = bytearray(ptr) 1164 kmrso.kmr_mfree(addr, siz.value) 1165 return data
1166
1167 - def read_file_by_segments(self, filename, color):
1168 """Reads one file by segments and reassembles.""" 1169 1170 buf = _c_void_p() 1171 siz = _c_uint64(0) 1172 kmrso.kmr_read_file_by_segments( 1173 self.mr._ckmr, filename, color, 1174 ctypes.byref(buf), ctypes.byref(siz)) 1175 addr = buf.value 1176 ptr = (_c_ubyte * siz.value).from_address(addr) 1177 data = bytearray(ptr) 1178 kmrso.kmr_mfree(addr, siz.value) 1179 return data
1180
1181 - def save(self):
1182 """Packs locally the contents of a KVS to a byte array.""" 1183 1184 buf = _c_void_p(0) 1185 siz = _c_size_t(0) 1186 kmrso.kmr_save_kvs(self._ckvs, ctypes.byref(buf), ctypes.byref(siz), 1187 _c_option()) 1188 addr = buf.value 1189 ptr = (_c_ubyte * siz.value).from_address(addr) 1190 data = bytearray(ptr) 1191 kmrso.kmr_mfree(addr, siz.value) 1192 return data
1193
1194 - def restore(self, data):
1195 """Unpacks locally the contents of a KVS from a byte array.""" 1196 1197 kvo = KVS(self.mr, "opaque", "opaque") 1198 siz = len(data) 1199 addr = (_c_ubyte * siz).from_buffer(data) 1200 kmrso.kmr_restore_kvs(kvo._ckvs, addr, siz, _c_option()) 1201 return kvo
1202
1203 -def fin():
1204 """Finishes using KMR4PY.""" 1205 1206 kmrso.kmr_fin() 1207 return
1208
1209 -def listify(kvs):
1210 """Returns an array of LOCAL contents.""" 1211 1212 a = kvs.local_element_count() * [None] 1213 def f (kv, kvi, kvo, i, *_data): 1214 a[i] = kv 1215 return 0
1216 kvo = kvs.map(f, output=False, inspect=True) 1217 assert (kvo is None) 1218 return a 1219
1220 -def _check_ctypes_values():
1221 """Checks if ctypes values are properly used.""" 1222 1223 if (not _c_null_pointer(_c_null_pointer_value)): 1224 raise Exception("BAD: C null pointer has a wrong value.")
1225
1226 -def _check_passing_options():
1227 """Checks if the options are passed properly from Python to C.""" 1228 1229 for (option, stringify) in [ 1230 (_c_option, kmrso.kmr_stringify_options), 1231 (_c_file_option, kmrso.kmr_stringify_file_options), 1232 (_c_spawn_option, kmrso.kmr_stringify_spawn_options)]: 1233 for (o, _, _) in option._fields_: 1234 if ((o == "gap16") or (o == "gap32")): 1235 pass 1236 else: 1237 copts = option({o : 1}) 1238 s = stringify(copts) 1239 if (o != s): 1240 raise Exception("BAD: %s != %s" % (str(o), str(s)))
1241 1242 # Copyright (C) 2012-2016 RIKEN AICS 1243 # This library is distributed WITHOUT ANY WARRANTY. This library can be 1244 # redistributed and/or modified under the terms of the BSD 2-Clause License. 1245