KMR
kmrrungenscript.in.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 
4 # Copyright (C) 2012-2016 RIKEN AICS
5 
6 ## \file kmrrungenscript.in.py KMRRUN Job-Script Generator.
7 
8 import sys
9 import os
10 import re
11 from optparse import OptionParser
12 
13 kmrhome = '@KMRHOME@'
14 
15 ## Checks file existence.
16 # If file does not exist, it prints an error message and exit.
17 # @param path file path for check.
18 
19 def _check_exist(path):
20  if not os.path.exists(path):
21  print >> sys.stderr, 'Error: file or dir "%s" is not exist.' % path
22  sys.exit()
23  return path
24 
25 
26 ## Check if command in the specified command line exists.
27 # @param cmdline a command line to be executed
28 # @param sched string that represents scheduler type
29 
30 def check_cmdline(cmdline, sched):
31  _check_exist(cmdline.split()[0])
32  if sched.upper() == 'K':
33  cmdlns = cmdline.split()
34  cmdlns[0] = './' + os.path.basename(cmdlns[0])
35  return ' '.join(cmdlns)
36  else:
37  return cmdline
38 
39 ## Check if command in the specified command line exists.
40 # @param dirname name of directory where input files are located
41 # @param sched string that represents scheduler type
42 
43 def check_indir(dirname, sched):
44  _check_exist(dirname)
45  if sched.upper() == 'K':
46  _dirname = dirname.rstrip().rstrip('/')
47  return './' + os.path.basename(_dirname)
48  else:
49  return dirname
50 
51 
52 ## Check restart mode.
53 # If node > number of checkpoint file, error.
54 # @param restart_basename prefix of checkpoint directory name
55 # @param procstr string that represents process number
56 # @param sched string that represents scheduler type
57 
58 def check_restart(restart_basename, procstr, sched):
59  if sched.upper() == 'K':
60  if restart_basename is None: return
61  ckpt_prefix = restart_basename + '.'
62  else:
63  ckpt_prefix = 'ckptdir'
64  repatter = re.compile(r'^%s\d+$' % ckpt_prefix)
65  files = os.listdir('./')
66  count = 0
67  for file_ in files:
68  if repatter.match(file_):
69  count += 1
70  if count == 0: return
71 
72  nprocs_file = ckpt_prefix + '00000/nprocs'
73  if not os.path.exists(nprocs_file):
74  print >> sys.stderr, \
75  'Error: Checkpoint nproc file %s not exit.\n' % nprocs_file
76  sys.exit()
77  preprocstr = open(nprocs_file).read()
78  preproc = preprocstr.split("=")[1]
79  if count != int(preproc):
80  print >> sys.stderr, \
81  'Error: Do not match number of checkpoint file and ' \
82  'executed process. ***\n'
83  sys.exit()
84  proc = k_node_to_int(procstr)
85  if proc > int(preproc):
86  print >> sys.stderr, \
87  'Error: On restart, increasing number of process is ' \
88  'not supported. ***\n'
89  sys.exit()
90  if count > proc:
91  sys.stderr.write("*** Reduction mode. ***\n")
92 
93 
94 ## Parse K node declaration into an integer.
95 # @param shape_str string that represents K node shape
96 
97 def k_node_to_int(shape_str):
98  m = re.match(r"(\d+)x?(\d+)?x?(\d+)?(:strict)?", shape_str)
99  prdct = 1
100  for mstr in m.groups()[0:3]:
101  if mstr:
102  prdct *= int(mstr)
103  return prdct
104 
105 
106 ## Generates job-script for K.
107 # @param name name of the job
108 # @param queue queue to submit job
109 # @param rsctime resource time limit
110 # @param node number of node to execute.
111 # @param kmrrun_path path to kmrrun command
112 # @param kmrrun_parameter parameter for kmrrun
113 # @param template_path path for template file
114 # @param shape mpi process shape
115 # @param proc number of execute proc
116 # @param mapper mapper command line
117 # @param kvgen kv generator command line
118 # @param reducer reducer command line
119 # @param indir directory where inputs are located(staged-in)
120 # @param ckpt enable checkpoint
121 # @param restart_basename prefix of checkpoint directory name
122 
123 def k_scheduler(name, queue, rsctime, node, kmrrun_path, kmrrun_parameter,
124  template_path, shape, proc, mapper, kvgen, reducer, indir,
125  ckpt, restart_basename):
126  # Stage in section
127  stginstr = ''
128  if mapper:
129  mapper_cmd = mapper.split()[0]
130  mapper_cmd_base = os.path.basename(mapper_cmd)
131  stginstr += '#PJM --stgin "%s %s"' % (mapper_cmd, mapper_cmd_base)
132  if kvgen:
133  if len(stginstr):
134  stginstr += '\n'
135  kvgen_cmd = kvgen.split()[0]
136  kvgen_cmd_base = os.path.basename(kvgen_cmd)
137  stginstr += '#PJM --stgin "%s %s"' % (kvgen_cmd, kvgen_cmd_base)
138  if reducer:
139  if len(stginstr):
140  stginstr += '\n'
141  reducer_cmd = reducer.split()[0]
142  reducer_cmd_base = os.path.basename(reducer_cmd)
143  stginstr += '#PJM --stgin "%s %s"' % (reducer_cmd, reducer_cmd_base)
144  if len(stginstr):
145  stginstr += '\n'
146  indir_stgin = './' + os.path.basename(indir.rstrip().rstrip('/'))
147  stginstr += '#PJM --stgin "%s/* %s/"' % (indir, indir_stgin)
148  # Stage in ckpt files
149  if restart_basename:
150  fname = os.path.basename(restart_basename) + '.00000/nprocs'
151  nproc = int(open(fname).read().split('=')[1])
152  for rank in range(nproc):
153  stginstr += '\n'
154  stginstr += '#PJM --stgin "./%s.%05d/* ./ckptdir%05d/"' \
155  % (restart_basename, rank, rank)
156 
157  # Stage out section
158  stgoutstr = "#\n# !!WRITE STGOUT HERE!!\n#"
159  # Stage out ckpt files
160  if ckpt or restart_basename:
161  for rank in range(k_node_to_int(proc)):
162  stgoutstr += '\n'
163  stgoutstr += '#PJM --stgout "./ckptdir%05d/* ' \
164  './ckptdir_%%j.%05d/"' % (rank, rank)
165 
166  execstr = 'mpiexec -n %d ./kmrrun %s' % (k_node_to_int(proc), kmrrun_parameter)
167 
168  template = open(template_path).read()
169  return template % {'NAME': name, 'QUEUE': queue, 'NODE': node,
170  'RSCTIME': rsctime, 'KMRRUN': kmrrun_path,
171  'SHAPE': shape, 'PROC': proc, 'DATASTGIN': stginstr,
172  'DATASTGOUT': stgoutstr, 'EXEC': execstr}
173 
174 
175 ## Generates job-script for FOCUS supercomputer
176 # @param name name of the job
177 # @param queue queue to submit job
178 # @param rsctime resource time limit
179 # @param node number of MPI processes to use
180 # @param kmrrun_path path to kmrrun command
181 # @param kmrrun_parameter parameter for kmrrun
182 # @param template_path path for template file
183 
184 def focus_scheduler(name, queue, rsctime, node, kmrrun_path, kmrrun_parameter,
185  template_path):
186  template = open(template_path).read()
187  return template % {'NAME': name, 'QUEUE': queue, 'NODE': node,
188  'RSCTIME': rsctime, 'KMRRUN': kmrrun_path,
189  'KMRRUN_PARAM': kmrrun_parameter}
190 
191 
192 ## Selects job-scheduler.
193 # @param opts Options to the generator
194 # @param sched scheduler
195 
196 def select_scheduler(opts, sched):
197  # find kmrrun and its job-scheduler templates
198  template_dir = kmrhome + '/lib'
199  kmrrun_path = template_dir + '/kmrrun'
200  if not os.path.exists(kmrrun_path):
201  # kmrrun does not exist in the install directory. In this case,
202  # We assume that we are working in KMRSRC/cmd directory.
203  template_dir = '.'
204  kmrrun_path = template_dir + '/../kmrrun/kmrrun'
205  if not os.path.exists(kmrrun_path):
206  # error exit
207  print >> sys.stderr, 'Error: could not find kmrrun utility.'
208  sys.exit()
209 
210  # set parameters
211  queue = opts.queue
212  node = opts.node
213  rsctime = options.rsctime
214  mapper = check_cmdline(opts.mapper, sched)
215  kvgen = check_cmdline(opts.kvgen, sched)
216  reducer = check_cmdline(opts.reducer, sched)
217  kmrrun_parameter = ''
218  if opts.taskproc:
219  kmrrun_parameter += '-n %s ' % (opts.taskproc)
220  if opts.mapper:
221  kmrrun_parameter += '-m "%s" ' % (mapper)
222  if opts.kvgen:
223  kmrrun_parameter += '-k "%s" ' % (kvgen)
224  if opts.reducer:
225  kmrrun_parameter += '-r "%s" ' % (reducer)
226  if opts.ckpt or opts.restart:
227  kmrrun_parameter += '--ckpt '
228  kmrrun_parameter += check_indir(opts.indir, sched)
229  name = 'kmrrun_job'
230  if opts.scrfile:
231  name = opts.scrfile
232 
233  if sched.upper() == 'K':
234  script = k_scheduler(name, queue, rsctime, node, kmrrun_path,
235  kmrrun_parameter,
236  template_dir + '/kmrrungenscript.template.k',
237  opts.shape, opts.proc, opts.mapper, opts.kvgen,
238  opts. reducer, opts.indir,
239  opts.ckpt, opts.restart)
240  elif sched.upper() == 'FOCUS':
241  script = focus_scheduler(name, queue, rsctime, node, kmrrun_path,
242  kmrrun_parameter,
243  template_dir + '/kmrrungenscript.template.focus')
244  # for other schedulers...
245  else:
246  print >> sys.stderr, 'Unknown scheduler'
247  sys.exit()
248 
249  # output script
250  if opts.scrfile is None:
251  print script
252  else:
253  out = open(opts.scrfile, "w")
254  print >> out, script
255  out.close()
256 
257 
258 ## Warn to write Stage-out section.
259 # @param opts Options to the generator
260 
261 def warn_stageout(opts):
262  if opts.sched != 'K':
263  return
264 
265  message = """
266 #########################################################################
267 Don't forget to write stage-out directives for MapReduce output files.
268 """[1:-1]
269  if opts.ckpt or opts.restart:
270  message += """
271 A job script generated by this program stages-out only checkpoint files.
272 """[0:-1]
273 
274  message += """
275 #########################################################################
276 """
277  print >> sys.stderr, message
278 
279 
280 ## kmrgenscript main routine.
281 # It works on Python 2.4 or later.
282 
283 if __name__ == "__main__":
284 
285  usage = "usage: %prog [options] -m mapper [-k keygener -r reducer]"
286  parser = OptionParser(usage)
287 
288  parser.add_option("-q",
289  "--queue",
290  dest="queue",
291  type="string",
292  help="queue to submit your job",
293  metavar="'string'",
294  default='None')
295 
296  parser.add_option("-t",
297  "--resource-time",
298  dest="rsctime",
299  type="string",
300  help="job execution time (default is '00:10:00')",
301  metavar="'string'",
302  default='00:10:00')
303 
304  parser.add_option("-e",
305  "--number-of-node",
306  dest="node",
307  type="string",
308  help="number of node (default is '12')",
309  metavar="'string'",
310  default='12')
311 
312  parser.add_option("-s",
313  "--shape",
314  dest="shape",
315  type="string",
316  help="mpi process shape. "
317  "Valid only on K scheduler. (default is '1')",
318  metavar="'string'",
319  default='1')
320 
321  parser.add_option("-p",
322  "--proc",
323  dest="proc",
324  type="string",
325  help="number of mpi processes. "
326  "Valid only on K scheduler. (default is '8')",
327  metavar="'string'",
328  default='8')
329 
330  parser.add_option("-d",
331  "--inputdir",
332  dest="indir",
333  type="string",
334  help="input file directory. "
335  "When used on K computer, this directory should be one "
336  "located in K global storage that is staged-in. "
337  "(default is './input')",
338  metavar="'string'",
339  default='./input')
340 
341  parser.add_option("-n",
342  "--task-proc",
343  dest="taskproc",
344  type="string",
345  help="number of processes to run each mapper/reducer "
346  "(default is 1)",
347  metavar="number",
348  default=1)
349 
350  parser.add_option("-m",
351  "--mapper",
352  dest="mapper",
353  type="string",
354  help="mapper command path and its arguments",
355  metavar="'string'")
356 
357  parser.add_option("-k",
358  "--kvgen",
359  dest="kvgen",
360  type="string",
361  help="kv generator command path and its arguments",
362  metavar="'string'")
363 
364  parser.add_option("-r",
365  "--reducer",
366  dest="reducer",
367  type="string",
368  help="reducer command path and its arguments",
369  metavar="'string'")
370 
371  parser.add_option("-C",
372  "--ckpt",
373  dest="ckpt",
374  action="store_true",
375  help="enable Checkpoint/Restart (default is false)",
376  default=False)
377 
378  parser.add_option("-R",
379  "--restart-filename",
380  dest="restart",
381  type="string",
382  help="specify prefix of directories where checkpoint "
383  "files are located. "
384  "This option should be given when restarting on "
385  "a system that requires staging. "
386  "Valid only on K scheduler.",
387  metavar="'string'")
388 
389  parser.add_option("-S",
390  "--scheduler",
391  dest="sched",
392  type="string",
393  help="scheduler type. "
394  "Specify Scheduler 'K' or 'FOCUS'. "
395  "'K' supports K computer/FX10 and 'FOCUS' supports "
396  "Focus supercomputer. (default is 'K')",
397  metavar="'string'",
398  default='K')
399 
400  parser.add_option("-w",
401  "--write-scriptfile",
402  dest="scrfile",
403  type="string",
404  help="output job script filename",
405  metavar="'string'")
406 
407  (options, args) = parser.parse_args()
408 
409  # check parameters.
410  if len(args) <> 0:
411  parser.error("Error: Missing parameter")
412  sys.exit()
413  if not options.mapper:
414  print >> sys.stderr, "Error: Mapper is not specified\n"
415  sys.exit()
416  if options.reducer and not options.kvgen:
417  print >> sys.stderr, \
418  "Error: Specify kv generator when reducer is specified\n"
419  sys.exit()
420 
421  if options.ckpt:
422  if options.sched == 'K':
423  check_restart(options.restart, options.proc, 'K')
424  else:
425  check_restart(options.restart, '1', options.sched)
426 
427  select_scheduler(options, options.sched)
428  warn_stageout(options)
429 
430 
431 # Copyright (C) 2012-2016 RIKEN AICS
432 # This library is distributed WITHOUT ANY WARRANTY. This library can be
433 # redistributed and/or modified under the terms of the BSD 2-Clause License.