from typing import Any, Dict, Tuple
import os
import time
import itertools
import subprocess
import multiprocessing
from . import util, errors, space
[docs]class Destination(object):
type = filename = overwrite = value = config = limits = None
opts = {} # type: Dict[Any, Any]
[docs] def set_final_filename(self, filename, overwrite):
self.type = "final"
self.filename = filename
self.overwrite = overwrite
[docs] def set_final_options(self, opts):
if opts is not False:
self.opts = opts
[docs] def set_limits(self, limits):
self.limits = limits
[docs] def set_config(self, conf):
self.config = conf
[docs] def set_tmp_filename(self, filename):
self.type = "tmp"
self.filename = filename
[docs] def set_memory(self):
self.type = "memory"
[docs] def store(self, verse):
self.value = None
if verse.dimension == 0:
raise ValueError("Empty output, Multiverse contains no spaces")
if self.type == "memory":
self.value = verse
elif self.type == "tmp":
verse.tofile(self.filename)
# verse.tovti(self.filename + ".vti")
elif self.type == "final":
for sp, fn in zip(verse.spaces, self.final_filenames()):
sp.config = self.config
sp.tofile(fn)
# sp.tovti(fn + ".vti")
[docs] def retrieve(self):
if self.type == "memory":
return self.value
[docs] def final_filenames(self):
fns = []
if self.limits is not None:
base, ext = os.path.splitext(self.filename)
for limlabel in util.limit_to_filelabel(self.limits):
fn = (base + "_" + limlabel + ext).format(**self.opts)
if not self.overwrite:
fn = util.find_unused_filename(fn)
fns.append(fn)
else:
fn = self.filename.format(**self.opts)
if not self.overwrite:
fn = util.find_unused_filename(fn)
fns.append(fn)
return fns
[docs]class DispatcherBase(util.ConfigurableObject):
def __init__(self, config, main):
self.main = main
super(DispatcherBase, self).__init__(config)
[docs] def parse_config(self, config):
super(DispatcherBase, self).parse_config(config)
self.config.destination = Destination()
# optional 'output.hdf5' by default
destination = config.pop("destination", "output.hdf5")
# by default: numbered files in the form output_ # .hdf5:
overwrite = util.parse_bool(config.pop("overwrite", "false"))
# explicitly parsing the options first helps with the debugging
self.config.destination.set_final_filename(destination, overwrite)
# ip adress of the running gui awaiting the spaces
self.config.host = config.pop("host", None)
# port of the running gui awaiting the spaces
self.config.port = config.pop("port", None)
# previewing the data, if true, also specify host and port
self.config.send_to_gui = util.parse_bool(config.pop("send_to_gui", "false"))
# provides the possiblity to send the results to the gui over the network
[docs] def send(self, verses):
if self.config.send_to_gui or (
self.config.host is not None and self.config.host is not None
):
# only continue of ip is specified and send_to_server is flagged
for M in verses:
if self.config.destination.limits is None:
sp = M.spaces[0]
if isinstance(sp, space.Space):
util.socket_send(
self.config.host,
int(self.config.port),
util.serialize(sp, ",".join(self.main.config.command)),
) # noqa
else:
for sp, label in zip(
M.spaces,
util.limit_to_filelabel(self.config.destination.limits),
): # noqa
if isinstance(sp, space.Space):
util.socket_send(
self.config.host,
int(self.config.port),
util.serialize(
sp,
"{0}_{1}".format(
",".join(self.main.config.command), label
),
),
) # noqa
yield M
else:
for M in verses:
yield M
[docs] def has_specific_task(self):
return False
[docs] def process_jobs(self, jobs):
raise NotImplementedError
[docs] def sum(self, results):
raise NotImplementedError
# The simplest possible dispatcher. Does the work all by itself on a single
# thread/core/node. 'Local' will most likely suit your needs better.
[docs]class SingleCore(DispatcherBase):
[docs] def process_jobs(self, jobs):
for job in jobs:
yield self.main.process_job(job)
[docs] def sum(self, results):
return space.chunked_sum(self.send(results))
# Base class for Dispatchers using subprocesses to do some work.
[docs]class ReentrantBase(DispatcherBase):
actions = ("user",) # type: Tuple[str, ...]
[docs] def parse_config(self, config):
super(ReentrantBase, self).parse_config(config)
self.config.action = config.pop("action", "user").lower()
if self.config.action not in self.actions:
raise errors.ConfigError(
"action {0} not recognized for {1}".format(
self.config.action, self.__class__.__name__
)
) # noqa
[docs] def has_specific_task(self):
if self.config.action == "user":
return False
else:
return True
[docs] def run_specific_task(self, command):
raise NotImplementedError
# Dispatch multiple worker processes locally, while doing the
# summation in the main process
[docs]class Local(ReentrantBase):
# OFFICIAL API
actions = "user", "job"
[docs] def parse_config(self, config):
super(Local, self).parse_config(config)
# optionally, specify number of cores (autodetect by default)
self.config.ncores = int(config.pop("ncores", 0))
if self.config.ncores <= 0:
self.config.ncores = multiprocessing.cpu_count()
[docs] def process_jobs(self, jobs):
# note: SingleCore will be marginally faster
pool = multiprocessing.Pool(self.config.ncores)
map = pool.imap_unordered
configs = (self.prepare_config(job) for job in jobs)
for result in map(self.main.get_reentrant(), configs):
yield result
[docs] def sum(self, results):
return space.chunked_sum(self.send(results))
[docs] def run_specific_task(self, command):
if command:
raise errors.SubprocessError(
"invalid command, too many parameters: '{0}'".format(command)
) # noqa
if self.config.action == "job":
result = self.main.process_job(self.config.job)
self.config.destination.store(result)
# UTILITY
[docs] def prepare_config(self, job):
config = self.main.clone_config()
config.dispatcher.destination.set_memory()
config.dispatcher.action = "job"
config.dispatcher.job = job
return config, ()
# Dispatch many worker processes on an Oar cluster.
[docs]class Oar(ReentrantBase):
# OFFICIAL API
actions = "user", "process"
[docs] def parse_config(self, config):
super(Oar, self).parse_config(config)
# Optional, current directory by default
self.config.tmpdir = config.pop("tmpdir", os.getcwd())
# optionally, tweak oarsub parameters
self.config.oarsub_options = config.pop("oarsub_options", "walltime=0:15")
# optionally, override default location of python and/or
# BINoculars installation
self.config.executable = config.pop(
"executable", " ".join(util.get_python_executable())
) # noqa
[docs] def process_jobs(self, jobs):
self.configfiles = []
self.intermediates = []
clusters = util.cluster_jobs2(jobs, self.main.input.config.target_weight)
for jobscluster in clusters:
uniq = util.uniqid()
jobconfig = os.path.join(
self.config.tmpdir, "binoculars-{0}-jobcfg.zpi".format(uniq)
)
self.configfiles.append(jobconfig)
config = self.main.clone_config()
interm = os.path.join(
self.config.tmpdir, "binoculars-{0}-jobout.hdf5".format(uniq)
)
self.intermediates.append(interm)
config.dispatcher.destination.set_tmp_filename(interm)
config.dispatcher.sum = ()
config.dispatcher.action = "process"
config.dispatcher.jobs = jobscluster
util.zpi_save(config, jobconfig)
yield self.oarsub(jobconfig)
# if all jobs are sent to the cluster send the process that
# sums all other jobs
uniq = util.uniqid()
jobconfig = os.path.join(
self.config.tmpdir, "binoculars-{0}-jobcfg.zpi".format(uniq)
)
self.configfiles.append(jobconfig)
config = self.main.clone_config()
config.dispatcher.sum = self.intermediates
config.dispatcher.action = "process"
config.dispatcher.jobs = ()
util.zpi_save(config, jobconfig)
yield self.oarsub(jobconfig)
[docs] def sum(self, results):
jobs = list(results)
jobscopy = jobs[:]
self.oarwait(jobs)
self.oar_cleanup(jobscopy)
return True
[docs] def run_specific_task(self, command):
if (
self.config.action != "process"
or (not self.config.jobs and not self.config.sum)
or command
):
raise errors.SubprocessError(
"invalid command, too many parameters or no jobs/sum given"
) # noqa
jobs = sum = space.EmptyVerse()
if self.config.jobs:
jobs = space.verse_sum(
self.send(self.main.process_job(job) for job in self.config.jobs)
)
if self.config.sum:
sum = space.chunked_sum(
space.Multiverse.fromfile(src)
for src in util.yield_when_exists(self.config.sum)
) # noqa
self.config.destination.store(jobs + sum)
# calling OAR
[docs] @staticmethod
def subprocess_run(*command):
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
output, unused_err = process.communicate()
retcode = process.poll()
return retcode, output
[docs] def oarsub(self, *args):
command = "{0} process {1}".format(
self.config.executable, " ".join(args)
) # noqa
ret, output = self.subprocess_run(
"oarsub", "-l {0}".format(self.config.oarsub_options), command
) # noqa
if ret == 0:
lines = output.split("\n")
for line in lines:
if line.startswith("OAR_JOB_ID="):
void, jobid = line.split("=")
util.status(
"{0}: Launched job {1}".format(time.ctime(), jobid)
) # noqa
return jobid.strip()
return False
[docs] def oarstat(self, jobid):
# % oarstat -s -j 5651374
# 5651374: Running
# % oarstat -s -j 5651374
# 5651374: Finishing
ret, output = self.subprocess_run("oarstat", "-s", "-j", str(jobid))
if ret == 0:
for n in output.split("\n"):
if n.startswith(str(jobid)):
job, status = n.split(":")
return status.strip()
else:
return "Unknown"
[docs] def oarwait(self, jobs, remaining=0):
if len(jobs) > remaining:
util.status(
"{0}: getting status of {1} jobs...".format(time.ctime(), len(jobs))
) # noqa
else:
return
delay = util.loop_delayer(30)
while len(jobs) > remaining:
next(delay)
i = 0
R = 0
W = 0
U = 0
while i < len(jobs):
state = self.oarstat(jobs[i])
if state == "Running":
R += 1
elif state in ("Waiting", "toLaunch", "Launching"):
W += 1
elif state == "Unknown":
U += 1
else: # assume state == 'Finishing' or 'Terminated'
# but don't wait on something unknown # noqa
del jobs[i]
i -= 1 # otherwise it skips a job
i += 1
util.status(
"{0}: {1} jobs to go. {2} waiting, {3} running, {4} unknown.".format(
time.ctime(), len(jobs), W, R, U
)
) # noqa
util.statuseol()
[docs] def oar_cleanup(self, jobs):
# cleanup:
for f in itertools.chain(self.configfiles, self.intermediates):
try:
os.remove(f)
except Exception as e:
print("unable to remove {0}: {1}".format(f, e))
errorfn = []
for jobid in jobs:
errorfilename = "OAR.{0}.stderr".format(jobid)
if os.path.exists(errorfilename):
with open(errorfilename, "r") as fp:
errormsg = fp.read()
if len(errormsg) > 0:
errorfn.append(errorfilename)
print(
"Critical error: OAR Job {0} failed with the following error: \n{1}".format(
jobid, errormsg
)
) # noqa
if len(errorfn) > 0:
print(
"Warning! {0} job(s) failed. See above for the details or the error log files: {1}".format(
len(errorfn), ", ".join(errorfn)
)
) # noqa