11 from optparse
import OptionParser
19 def _check_exist(path):
20 if not os.path.exists(path):
21 print >> sys.stderr,
'Error: file or dir "%s" is not exist.' % path
30 def check_cmdline(cmdline, sched):
31 _check_exist(cmdline.split()[0])
32 if sched.upper() ==
'K':
33 cmdlns = cmdline.split()
34 cmdlns[0] =
'./' + os.path.basename(cmdlns[0])
35 return ' '.join(cmdlns)
43 def check_indir(dirname, sched):
45 if sched.upper() ==
'K':
46 _dirname = dirname.rstrip().rstrip(
'/')
47 return './' + os.path.basename(_dirname)
58 def check_restart(restart_basename, procstr, sched):
59 if sched.upper() ==
'K':
60 if restart_basename
is None:
return 61 ckpt_prefix = restart_basename +
'.' 63 ckpt_prefix =
'ckptdir' 64 repatter = re.compile(
r'^%s\d+$' % ckpt_prefix)
65 files = os.listdir(
'./')
68 if repatter.match(file_):
72 nprocs_file = ckpt_prefix +
'00000/nprocs' 73 if not os.path.exists(nprocs_file):
74 print >> sys.stderr, \
75 'Error: Checkpoint nproc file %s not exit.\n' % nprocs_file
77 preprocstr = open(nprocs_file).read()
78 preproc = preprocstr.split(
"=")[1]
79 if count != int(preproc):
80 print >> sys.stderr, \
81 'Error: Do not match number of checkpoint file and ' \
82 'executed process. ***\n' 84 proc = k_node_to_int(procstr)
85 if proc > int(preproc):
86 print >> sys.stderr, \
87 'Error: On restart, increasing number of process is ' \
88 'not supported. ***\n' 91 sys.stderr.write(
"*** Reduction mode. ***\n")
97 def k_node_to_int(shape_str):
98 m = re.match(
r"(\d+)x?(\d+)?x?(\d+)?(:strict)?", shape_str)
100 for mstr
in m.groups()[0:3]:
123 def k_scheduler(name, queue, rsctime, node, kmrrun_path, kmrrun_parameter,
124 template_path, shape, proc, mapper, kvgen, reducer, indir,
125 ckpt, restart_basename):
129 mapper_cmd = mapper.split()[0]
130 mapper_cmd_base = os.path.basename(mapper_cmd)
131 stginstr +=
'#PJM --stgin "%s %s"' % (mapper_cmd, mapper_cmd_base)
135 kvgen_cmd = kvgen.split()[0]
136 kvgen_cmd_base = os.path.basename(kvgen_cmd)
137 stginstr +=
'#PJM --stgin "%s %s"' % (kvgen_cmd, kvgen_cmd_base)
141 reducer_cmd = reducer.split()[0]
142 reducer_cmd_base = os.path.basename(reducer_cmd)
143 stginstr +=
'#PJM --stgin "%s %s"' % (reducer_cmd, reducer_cmd_base)
146 indir_stgin =
'./' + os.path.basename(indir.rstrip().rstrip(
'/'))
147 stginstr +=
'#PJM --stgin "%s/* %s/"' % (indir, indir_stgin)
150 fname = os.path.basename(restart_basename) +
'.00000/nprocs' 151 nproc = int(open(fname).read().split(
'=')[1])
152 for rank
in range(nproc):
154 stginstr +=
'#PJM --stgin "./%s.%05d/* ./ckptdir%05d/"' \
155 % (restart_basename, rank, rank)
158 stgoutstr =
"#\n# !!WRITE STGOUT HERE!!\n#" 160 if ckpt
or restart_basename:
161 for rank
in range(k_node_to_int(proc)):
163 stgoutstr +=
'#PJM --stgout "./ckptdir%05d/* ' \
164 './ckptdir_%%j.%05d/"' % (rank, rank)
166 execstr =
'mpiexec -n %d ./kmrrun %s' % (k_node_to_int(proc), kmrrun_parameter)
168 template = open(template_path).read()
169 return template % {
'NAME': name,
'QUEUE': queue,
'NODE': node,
170 'RSCTIME': rsctime,
'KMRRUN': kmrrun_path,
171 'SHAPE': shape,
'PROC': proc,
'DATASTGIN': stginstr,
172 'DATASTGOUT': stgoutstr,
'EXEC': execstr}
184 def focus_scheduler(name, queue, rsctime, node, kmrrun_path, kmrrun_parameter,
186 template = open(template_path).read()
187 return template % {
'NAME': name,
'QUEUE': queue,
'NODE': node,
188 'RSCTIME': rsctime,
'KMRRUN': kmrrun_path,
189 'KMRRUN_PARAM': kmrrun_parameter}
196 def select_scheduler(opts, sched):
198 template_dir = kmrhome +
'/lib' 199 kmrrun_path = template_dir +
'/kmrrun' 200 if not os.path.exists(kmrrun_path):
204 kmrrun_path = template_dir +
'/../kmrrun/kmrrun' 205 if not os.path.exists(kmrrun_path):
207 print >> sys.stderr,
'Error: could not find kmrrun utility.' 213 rsctime = options.rsctime
214 mapper = check_cmdline(opts.mapper, sched)
215 kvgen = check_cmdline(opts.kvgen, sched)
216 reducer = check_cmdline(opts.reducer, sched)
217 kmrrun_parameter =
'' 219 kmrrun_parameter +=
'-n %s ' % (opts.taskproc)
221 kmrrun_parameter +=
'-m "%s" ' % (mapper)
223 kmrrun_parameter +=
'-k "%s" ' % (kvgen)
225 kmrrun_parameter +=
'-r "%s" ' % (reducer)
226 if opts.ckpt
or opts.restart:
227 kmrrun_parameter +=
'--ckpt ' 228 kmrrun_parameter += check_indir(opts.indir, sched)
233 if sched.upper() ==
'K':
234 script = k_scheduler(name, queue, rsctime, node, kmrrun_path,
236 template_dir +
'/kmrrungenscript.template.k',
237 opts.shape, opts.proc, opts.mapper, opts.kvgen,
238 opts. reducer, opts.indir,
239 opts.ckpt, opts.restart)
240 elif sched.upper() ==
'FOCUS':
241 script = focus_scheduler(name, queue, rsctime, node, kmrrun_path,
243 template_dir +
'/kmrrungenscript.template.focus')
246 print >> sys.stderr,
'Unknown scheduler' 250 if opts.scrfile
is None:
253 out = open(opts.scrfile,
"w")
261 def warn_stageout(opts):
262 if opts.sched !=
'K':
266 ######################################################################### 267 Don't forget to write stage-out directives for MapReduce output files. 269 if opts.ckpt
or opts.restart:
271 A job script generated by this program stages-out only checkpoint files. 275 ######################################################################### 277 print >> sys.stderr, message
283 if __name__ ==
"__main__":
285 usage =
"usage: %prog [options] -m mapper [-k keygener -r reducer]" 286 parser = OptionParser(usage)
288 parser.add_option(
"-q",
292 help=
"queue to submit your job",
296 parser.add_option(
"-t",
300 help=
"job execution time (default is '00:10:00')",
304 parser.add_option(
"-e",
308 help=
"number of node (default is '12')",
312 parser.add_option(
"-s",
316 help=
"mpi process shape. " 317 "Valid only on K scheduler. (default is '1')",
321 parser.add_option(
"-p",
325 help=
"number of mpi processes. " 326 "Valid only on K scheduler. (default is '8')",
330 parser.add_option(
"-d",
334 help=
"input file directory. " 335 "When used on K computer, this directory should be one " 336 "located in K global storage that is staged-in. " 337 "(default is './input')",
341 parser.add_option(
"-n",
345 help=
"number of processes to run each mapper/reducer " 350 parser.add_option(
"-m",
354 help=
"mapper command path and its arguments",
357 parser.add_option(
"-k",
361 help=
"kv generator command path and its arguments",
364 parser.add_option(
"-r",
368 help=
"reducer command path and its arguments",
371 parser.add_option(
"-C",
375 help=
"enable Checkpoint/Restart (default is false)",
378 parser.add_option(
"-R",
379 "--restart-filename",
382 help=
"specify prefix of directories where checkpoint " 383 "files are located. " 384 "This option should be given when restarting on " 385 "a system that requires staging. " 386 "Valid only on K scheduler.",
389 parser.add_option(
"-S",
393 help=
"scheduler type. " 394 "Specify Scheduler 'K' or 'FOCUS'. " 395 "'K' supports K computer/FX10 and 'FOCUS' supports " 396 "Focus supercomputer. (default is 'K')",
400 parser.add_option(
"-w",
401 "--write-scriptfile",
404 help=
"output job script filename",
407 (options, args) = parser.parse_args()
411 parser.error(
"Error: Missing parameter")
413 if not options.mapper:
414 print >> sys.stderr,
"Error: Mapper is not specified\n" 416 if options.reducer
and not options.kvgen:
417 print >> sys.stderr, \
418 "Error: Specify kv generator when reducer is specified\n" 422 if options.sched ==
'K':
423 check_restart(options.restart, options.proc,
'K')
425 check_restart(options.restart,
'1', options.sched)
427 select_scheduler(options, options.sched)
428 warn_stageout(options)