Source code for monty.subprocess

"""
Calling shell processes.
"""
import shlex
import threading
import traceback
from subprocess import PIPE, Popen

from .string import is_string

__author__ = "Matteo Giantomass"
__copyright__ = "Copyright 2014, The Materials Virtual Lab"
__version__ = "0.1"
__maintainer__ = "Matteo Giantomassi"
__email__ = "gmatteo@gmail.com"
__date__ = "10/26/14"


[docs]class Command: """ Enables to run subprocess commands in a different thread with TIMEOUT option. Based on jcollado's solution: http://stackoverflow.com/questions/1191374/subprocess-with-timeout/4825933#4825933 and https://gist.github.com/kirpit/1306188 .. attribute:: retcode Return code of the subprocess .. attribute:: killed True if subprocess has been killed due to the timeout .. attribute:: output stdout of the subprocess .. attribute:: error stderr of the subprocess Example: com = Command("sleep 1").run(timeout=2) print(com.retcode, com.killed, com.output, com.output) """ def __init__(self, command): """ :param command: Command to execute """ if is_string(command): command = shlex.split(command) self.command = command self.process = None self.retcode = None self.output, self.error = "", "" self.killed = False def __str__(self): return f"command: {self.command}, retcode: {self.retcode}"
[docs] def run(self, timeout=None, **kwargs): """ Run a command in a separated thread and wait timeout seconds. kwargs are keyword arguments passed to Popen. Return: self """ def target(**kw): try: # print('Thread started') with Popen(self.command, **kw) as self.process: self.output, self.error = self.process.communicate() self.retcode = self.process.returncode # print('Thread stopped') except Exception: self.error = traceback.format_exc() self.retcode = -1 # default stdout and stderr if "stdout" not in kwargs: kwargs["stdout"] = PIPE if "stderr" not in kwargs: kwargs["stderr"] = PIPE # thread thread = threading.Thread(target=target, kwargs=kwargs) thread.start() thread.join(timeout) if thread.is_alive(): # print("Terminating process") self.process.terminate() self.killed = True thread.join() return self