Skip to content

Instantly share code, notes, and snippets.

@DevinBayly
Created March 2, 2022 00:09
Show Gist options
  • Select an option

  • Save DevinBayly/310c8689c6221fd379aad34243441dda to your computer and use it in GitHub Desktop.

Select an option

Save DevinBayly/310c8689c6221fd379aad34243441dda to your computer and use it in GitHub Desktop.
###############################################################################
# Class: PumaSubmitter_sbatch
#
# Purpose: Launch an MPI job with sbatch.
#
# Programmer: Devin Bayly (extending Brad Whitlock's work)
# Date: Thu May 17 14:22:04 PDT 2012
#
# Modifications:
# Satheesh Maheswaran, Tue Jan 15 14:47:54 GMT 2013
# I added job name option to Slurm scheduler and mpirun launcher argument support
#
# Paul Selby, Thu May 16 12:06:30 BST 2013
# Use launchargs if supplied
#
# Eric Brugger, Fri Feb 28 10:19:02 PST 2014
# I added a fix from Jean Favre that corrected the setting of the -N option
# for aprun.
#
# David Camp, Thu Oct 1 11:37:53 PDT 2015
# Update sbatch to be more like qsub to support new NERSC system, Cori.
# Also added the srun mpi launcher option.
#
# Devin Bayly, Monday Dec 6 16:51:34 2021
# added in module load calls that are necessary for our site
#
###############################################################################
class PumaSubmitter_sbatch(JobSubmitter):
def __init__(self, launcher):
super(PumaSubmitter_sbatch, self).__init__(launcher)
def Executable(self):
return ["sbatch"]
def CreateFilename(self):
tdate = time.asctime()[11:19]
tuser = self.launcher.username()
return os.path.join("/tmp", "visit.%s.%s" % (tuser, tdate))
def HandledHardwareArguments(self):
return 1
def TFileLoadModules(self, tfile):
print("attempting to add a module load call")
tfile.write("source /etc/profile.d/lmod.sh\n")
tfile.write("module load ohpc\n")
return
def TFileSetup(self, tfile):
tfile.write("cd %s\n" % os.path.abspath(os.curdir))
tfile.write("ulimit -c 0\n")
#tfile.write("# Submitted on host %s\n" % self.launcher.hostname())
#for v in ("LIBPATH", "LD_LIBRARY_PATH", "VISITHOME", "VISITARCHHOME", "VISITPLUGINDIR"):
# tfile.write('echo "%s=$%s"\n' % (v,v))
def Writable(self, d):
return self.launcher.writepermission(d)
def HandleCWDPermissions(self):
args = []
if not self.Writable(os.path.abspath(os.curdir)):
msg = """
The directory you started VisIt in does not have write permission.
Using /dev/null for the batch job standard error and standard
output streams.
"""
self.launcher.warning(msg)
args = args + ["-e", "/dev/null", "-o", "/dev/null"]
return args
def AddEnvironment(self):
env = "--export="
env = env + "HOME=" + GETENV("HOME")
env = env + ",LIBPATH=" + GETENV("LIBPATH")
env = env + ",LD_LIBRARY_PATH=" + GETENV("LD_LIBRARY_PATH")
env = env + ",VISITHOME=" + GETENV("VISITHOME")
env = env + ",VISITARCHHOME=" + GETENV("VISITARCHHOME")
env = env + ",VISITPLUGINDIR=" + GETENV("VISITPLUGINDIR")
return [env]
def mpiexec(self):
return ["mpiexec"]
def mpiexec_args(self, args):
mpicmd = self.mpiexec()
if self.parallel.sublaunchargs != None:
mpicmd = mpicmd + self.parallel.sublaunchargs
if self.parallel.np != None:
mpicmd = mpicmd + ["-n", self.parallel.np]
if self.parallel.nn != None:
mpicmd = mpicmd + [self.PPNArgument(), self.PPN()]
if self.parallel.machinefile != None:
mpicmd = mpicmd + ["-machinefile", self.parallel.machinefile]
mpicmd = mpicmd + self.VisItExecutable() + args
return mpicmd
def aprun(self):
return ["aprun"]
def aprun_args(self, args):
mpicmd = self.aprun()
if self.parallel.sublaunchargs != None:
mpicmd = mpicmd + self.parallel.sublaunchargs
if self.parallel.np != None:
mpicmd = mpicmd + ["-n", self.parallel.np]
if self.parallel.nn != None:
mpicmd = mpicmd + ["-N", self.PPN()]
mpicmd = mpicmd + self.VisItExecutable() + args
return mpicmd
def mpirun(self):
return ["mpirun"]
def mpirun_args(self, args):
mpicmd = self.mpirun()
if self.parallel.sublaunchargs != None:
mpicmd = mpicmd + self.parallel.sublaunchargs
if self.parallel.np != None:
mpicmd = mpicmd + ["-np", self.parallel.np]
if self.parallel.nn != None:
mpicmd = mpicmd + [self.PPNArgument(), self.PPN()]
if self.parallel.machinefile != None:
mpicmd = mpicmd + ["-machinefile", self.parallel.machinefile]
mpicmd = mpicmd + self.VisItExecutable() + args
return mpicmd
def ibrun(self):
return ["ibrun"]
def ibrun_args(self, args):
mpicmd = self.ibrun()
if self.parallel.sublaunchargs != None:
mpicmd = mpicmd + self.parallel.sublaunchargs
mpicmd = mpicmd + self.VisItExecutable() + args
return mpicmd
def srun(self):
return ["srun"]
def srun_args(self, args):
mpicmd = self.srun()
if self.parallel.sublaunchargs != None:
mpicmd = mpicmd + self.parallel.sublaunchargs
if self.parallel.np != None:
mpicmd = mpicmd + ["--ntasks=%s" % self.parallel.np]
if self.parallel.nn != None:
mpicmd = mpicmd + ["--nodes=%s" % self.parallel.nn]
mpicmd = mpicmd + ["--tasks-per-node=%s" % self.PPN()]
mpicmd = mpicmd + self.VisItExecutable() + args
return mpicmd
def CreateCommand(self, args, debugger):
parcmd = self.Executable()
parcmd = parcmd + self.HandleCWDPermissions()
parcmd = parcmd + self.AddEnvironment()
if self.parallel.launchargs != None:
parcmd = parcmd + self.parallel.launchargs
if self.parallel.partition != None:
parcmd = parcmd + ["--partition=%s" % self.parallel.partition]
if self.parallel.bank != None:
parcmd = parcmd + ["--account=%s" % self.parallel.bank]
if self.parallel.time != None:
parcmd = parcmd + ["--time=%s" % self.parallel.time]
if self.parallel.np != None:
parcmd = parcmd + ["--ntasks=%s" % self.parallel.np]
if self.parallel.nn != None:
parcmd = parcmd + ["--nodes=%s" % self.parallel.nn]
parcmd = parcmd + ["--tasks-per-node=%s" % self.PPN()]
if self.parallel.name != None:
parcmd = parcmd + ["--job-name=%s" % self.parallel.name]
sbatch,sublauncher = self.LauncherAndSubLauncher()
if sublauncher == "srun":
mpicmd = self.srun_args(args)
elif sublauncher == "mpiexec":
mpicmd = self.mpiexec_args(args)
elif sublauncher == "aprun":
mpicmd = self.aprun_args(args)
elif sublauncher == "mpirun":
mpicmd = self.mpirun_args(args)
elif sublauncher == "ibrun":
mpicmd = self.ibrun_args(args)
else:
mpicmd = self.VisItExecutable() + args
mpicmd = debugger.CreateCommand(mpicmd)
# Create the tfile
tfilename = self.CreateFilename()
try:
tfile = open(tfilename, "wt")
tfile.write("#!/bin/sh\n")
self.TFileLoadModules(tfile)
self.TFileSetup(tfile)
if self.parallel.hw_precmd != None:
tfile.write(self.parallel.hw_precmd + "\n")
if self.parallel.sublaunchprecmd != None:
tfile.write(string.join(self.parallel.sublaunchprecmd, " ") + "\n")
tfile.write(string.join(mpicmd, " ") + "\n")
if self.parallel.sublaunchpostcmd != None:
tfile.write(string.join(self.parallel.sublaunchpostcmd, " ") + "\n")
if self.parallel.hw_postcmd != None:
tfile.write(self.parallel.hw_postcmd + "\n")
tfile.write("echo 'reached bottom of the sbatch generated file in /tmp' \n")
tfile.close()
os.chmod(tfilename, 0o775)
except (OSError, IOError):
exit("Could not create script file to launch %s job." % self.parallel.launcher, 0)
# The parallel command is the tfile script we just made.
parcmd = parcmd + [tfilename]
print("double checking we made it this far, parcmd is ",parcmd)
return parcmd
###############################################################################
# Class: PumaLauncher
#
# Purpose: Custom launcher for Ohio Supercomputer Center
# Programmer:
# Date: Tues Nov 1 2016
#
# Customization issues:
# 1) Exposure of locally related PATH and LD_LIBRARY_PATH by using Lmod
# 2) Absolute path for qsub
# 3) Bypass PPNArgument and favor Argonne mpiexec
# 4) PBS_HOME needs to be set for the OSC submitfilter to work
# 5) Only Cluster Currently Supported is Owens
#
# Thanks to Allen Sanderson for the help!
#
# Modifications:
# Kevin Manalo
# Zhi-Qiang You, Wed Oct 7 2021
# 1) Remove PBS implementation and use internal launcher.
# Devin Bayly
# change the custom message and include option to switch the node
#
###############################################################################
class PumaLauncher(MainLauncher):
def __init__(self):
super(PumaLauncher, self).__init__()
def Customize(self):
msg = """ University of Arizona - Custom Launch File \n"""
print(msg)
def call(self, args, stdinpipe=0, env=None):
## add in a cluster switch arg
print("original args",args)
if "sbatch" == args[0]:
cluster =""
if "elgato" in args:
print("elgato found")
cluster = "elgato"
args.remove("elgato")
elif "ocelote" in args:
print("elgato found")
cluster = "ocelote"
args.remove("ocelote")
else:
print("puma found")
cluster = "puma"
args.remove("puma")
## remove the -la from the list
#ind = args.index("-la")
#args.pop(ind)
#args.pop(ind)
#print("after popping",args)
## by popping twice we also remove the argument that came after the -la
ammended = """
. /usr/local/bin/slurm-selector.sh {}
{}
""".format(cluster,' '.join(args)).strip()
print(ammended)
p = subprocess.Popen("/bin/bash",stdin=subprocess.PIPE,env = env)
p.communicate(ammended)
ret = p.returncode
return ret
else:
p= subprocess.Popen(args,env=env)
p.communicate()
ret = p.returncode
return ret
def JobSubmitterFactory(self, launch):
return PumaSubmitter_sbatch(self)
# Launcher creation function
def createlauncher():
return PumaLauncher()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment