Source code for sarge

# -*- coding: utf-8 -*-
#
# Copyright (C) 2012-2022 Vinay M. Sajip. See LICENSE for licensing information.
#
# sarge: Subprocess Allegedly Rewards Good Encapsulation :-)
#
import errno
from io import BytesIO
import logging
import os

try:
    import queue
except ImportError:  # pragma: no cover
    import Queue as queue
import re
import shutil
import signal
import string
import subprocess
import sys
import threading

try:
    from logging import NullHandler
except ImportError:  # pragma: no cover

    class NullHandler(logging.Handler):

        def handle(self, record):
            pass

        emit = handle

        def createLock(self):
            self.lock = None


from .shlext import shell_shlex

__all__ = ('shell_quote', 'Capture', 'Command', 'ShellFormatter', 'Pipeline', 'Feeder',
           'shell_format', 'run', 'parse_command_line', 'capture_stdout',
           'capture_stderr', 'capture_both', 'get_stdout', 'get_stderr', 'get_both')

__version__ = '0.1.8.dev0'
__date__ = '2021-12-11'

logger = logging.getLogger(__name__)
logger.addHandler(NullHandler())
del NullHandler

# We use a separate logger for parsing, as that's sometimes too much
# information :-)
parse_logger = logging.getLogger('%s.parse' % __name__)
#
# This runs on Python 2.x and 3.x from the same code base - no need for 2to3.
#
if sys.version_info[0] < 3:  # pragma: no cover
    PY3 = False
    text_type = unicode
    binary_type = str
    string_types = basestring,
    _wait_has_timeout = False
else:  # pragma: no cover
    PY3 = True
    text_type = str
    binary_type = bytes
    string_types = str,
    basestring = str
    _wait_has_timeout = sys.version_info[:2] >= (3, 3)

# This regex determines which shell input needs quoting
# because it may be unsafe
UNSAFE = re.compile(r'[^\w%+,./:=@-]')


[docs]def shell_quote(s): """ Quote text so that it is safe for Posix command shells. For example, "*.py" would be converted to "'*.py'". If the text is considered safe it is returned unquoted. Args: s (str): The value to quote Returns: str: A safe version of the input, from the point of view of POSIX command shells """ assert isinstance(s, string_types) if not s: result = "''" elif not UNSAFE.search(s): result = s else: result = "'%s'" % s.replace("'", r"'\''") return result
class ShellFormatter(string.Formatter): """ This class overrides :class:`string.Formatter` to provide a custom `convert_field()` method, which ensures that fields are quoted for safety using `shell_quote()`. """ def convert_field(self, value, conversion): """ Convert a field to text. If a conversion is specified (e.g. !s, !r), no quoting is performed. If *no* conversion is specified, the value is converted to string (using `str()`) and that value is quoted using `shell_quote()` before being returned. Args: value (Any): The value to be converted. conversion (str|None): The conversion to apply. Returns: str: The converted value. """ if conversion is None: result = shell_quote(str(value)) else: result = super(ShellFormatter, self).convert_field(value, conversion) return result
[docs]def shell_format(fmt, *args, **kwargs): """ Format a shell command with format placeholders and variables to fill those placeholders. Note: you must specify positional parameters explicitly, i.e. as {0}, {1} instead of {}, {}. Requiring the formatter to maintain its own counter can lead to thread safety issues unless a thread local is used to maintain the counter. It's not that hard to specify the values explicitly yourself :-) Args: fmt (str): The shell command as a format string. Note that you will need to double up braces you want in the result, i.e. { -> {{ and } -> }}, due to the way `str.format()` works. args (list): Positional arguments for use with ``fmt``. kwargs (dict): Keyword arguments for use with ``fmt``. Returns: str: The formatted shell command, which should be safe for use in shells from the point of view of shell injection. """ return ShellFormatter().vformat(fmt, args, kwargs)
class WithMixin(object): """ This class provides a very simple mixin for objects which can be used in a ``with`` statement. """ def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() default_capture_timeout = 0.02 default_expect_timeout = 5.0
[docs]class Capture(WithMixin): """ This class encapsulates an output stream of a sub-process. You just set ``stdout`` or ``stderr`` of a :class:`Command` or :class:`Pipeline` to an instance of this class. """ counter = 1 # These are needed to allow wrapping using TextIOWrapper readable = lambda self: True writable = lambda self: False seekable = readable closed = False def __init__(self, timeout=None, buffer_size=-1, encoding='utf-8'): """ Create a new instance. Args: timeout (float): The timeout to use for this instance. If not specified, the module attribute ``default_capture_timeout` is used. buffer_size (int): The buffer size to use when reading from streams. If not specified, a 4K buffer is used. encoding (str): The encoding to use. """ self.timeout = timeout or default_capture_timeout self.streams = [] self.buffer = queue.Queue() self.buffer_size = buffer_size or 4096 self.encoding = encoding self.current = None self._bytes = None self.threads = [] self.pattern = None self.matched = threading.Event() self.match = None self.match_index = 0 self.counter = self.__class__.counter self.__class__.counter += 1 def add_stream(self, stream): """ Add a stream to this instance. A new thread is spawned to read from the stream into the capture queue for this instance. Args: stream (file): An output stream from a child process (i.e. the read end of a pipe, whose write end is the output stream from the process. """ self.streams.append(stream) ready = threading.Event() t = threading.Thread(target=self.reader, args=(stream, ready)) logger.debug('Created thread %s as reader for %r', t.name, self) self.threads.append(t) t.daemon = True t.start() logger.debug('%r: reader thread kicked off, waiting start', self) ready.wait() logger.debug('%r: reader thread now started', self) def reader(self, stream, ready): """ The callable used as the runnable in reader threads. Args: stream (file): The stream to read. ready (threading.Event): An Event instance to set when the reader thread starts executing. """ ready.set() chunk_size = self.buffer_size if chunk_size > 0: logger.debug('%r: reader thread about to read %s', self, chunk_size) else: logger.debug('%r: reader thread about to read line', self) self._done = False while not self._done: if chunk_size < 0: chunk = stream.readline() else: chunk = stream.read(chunk_size) if chunk: self.buffer.put_nowait(chunk) logger.debug('queued chunk of length %d to %s: %r', len(chunk), self.buffer, chunk[:30]) if self.pattern and not self.matched.is_set(): self._try_match() if chunk_size > 0: if len(chunk) < chunk_size: break else: if not chunk: break logger.debug('%r: finished reading stream %s', self, stream) stream.close() @property def bytes(self): """ All the bytes in the capture queue. """ data = self.read() if self._bytes is not None: data = self._bytes + data self._bytes = data return data @property def text(self): """ All the bytes in the capture queue, decoded as text. """ return self.bytes.decode(self.encoding) def streams_open(self): result = False for c in self.streams: if not c.closed: result = True break return result
[docs] def read(self, size=-1, block=True, timeout=None): """ Read some bytes from this instance. Args: size (int): The number of bytes to read. If less than zero, the reading continues until there is no more data. block (bool): If ``True``, don't return until the required bytes are available. timeout (float): The timeout in seconds. Returns: bytes: The bytes requested. """ if not self.streams_open(): block = False timeout = None else: timeout = timeout or self.timeout if size <= 0: b = [] while True: try: b.append(self.buffer.get(block, timeout)) except queue.Empty: break result = b''.join(b) else: if self.current is None: try: self.current = self.buffer.get(block, timeout) except queue.Empty: self.current = b'' while not self.current: try: self.current += self.buffer.get(block, timeout) except queue.Empty: break if len(self.current) <= size: result = self.current self.current = None else: result = self.current[:size] self.current = self.current[size:] return result
def read1(self, n): return self.read(n)
[docs] def readline(self, size=-1, block=True, timeout=None): """ Read a line from this instance, optionally up to a certain size. Args: size (int): If specified as greater than zero, this many bytes are returned even if not a complete line. block (bool): If ``True``, don't return until the required bytes are available. timeout (float): The timeout in seconds. """ if not self.streams_open(): block = False timeout = None else: timeout = timeout or self.timeout if self.current is None: try: self.current = self.buffer.get(block, timeout) except queue.Empty: self.current = b'' while b'\n' not in self.current: try: self.current += self.buffer.get(block, timeout) except queue.Empty: break if b'\n' not in self.current: result = self.current self.current = None else: i = self.current.index(b'\n') if 0 < size < i: i = size - 1 result = self.current[:i + 1] self.current = self.current[i + 1:] return result
[docs] def readlines(self, sizehint=-1, block=True, timeout=None): """ Read multiple lines from this instance. Args: sizehint (int): A suggested number of bytes to read. block (bool): If ``True``, don't return until the required bytes are available. timeout (float): The timeout in seconds. """ if not self.streams_open(): block = False timeout = None else: timeout = timeout or self.timeout data = self.read(sizehint, block, timeout) if self.current is not None: data = self.current + data self.current = None return data.splitlines(True)
def _try_match(self): data = self.bytes if data and self.pattern: m = self.pattern.search(data, self.match_index) if m: logger.debug('Found at %s after %d bytes', m.span(), len(data)) self.match = m self.match_index = m.end() self.matched.set()
[docs] def expect(self, pattern, timeout=None): """ Wait for a pattern to appear. Args: pattern (str|bytes): The pattern to wait for. timeout (float): The timeout in seconds. Returns: bytes: The matching input. """ def as_pattern(p): if isinstance(p, string_types): if isinstance(p, text_type): p = p.encode('utf-8') p = re.compile(p, re.MULTILINE) return p self.pattern = as_pattern(pattern) self.matched.clear() self.match = None self._try_match() if not self.match: if timeout is None: timeout = default_expect_timeout self.matched.wait(timeout) return self.match
def __iter__(self): while True: line = self.readline() if not line: break yield line def close(self, stop_threads=False): """ Close this instance. Once this is done, no attempts should be made to read from it. We don't mark it as closed, so that a TextIOWrapper wrapping it won't fail before all the input has been read. Args: stop_threads (bool): If ``True``, the threads populating this instance are asked to stop. """ if stop_threads: self._done = True # may lose some data sent from subprocess for t in self.threads: try: t.join() except RuntimeError: # pragma: no cover logger.debug('failed to join thread: %s', t) # raise def __repr__(self): # pragma: no cover return '%s-%d' % (self.__class__.__name__, self.counter)
class Feeder(object): """ Facilitate sending data to a child process over time rather than just when the child is spawned. """ def __init__(self): self._r, self._w = os.pipe() def fileno(self): return self._r def feed(self, data): if isinstance(data, text_type): data = data.encode('utf-8') if not isinstance(data, bytes): raise TypeError('Bytes expected, got %s' % type(data)) os.write(self._w, data) def close(self): if self._r: os.close(self._r) self._r = None if self._w: os.close(self._w) self._w = None def ensure_stream(input, encoding='utf-8'): """ Convert a possible text value into a binary file-like object. """ if isinstance(input, text_type): input = input.encode(encoding) # need to be explicit for 2.x! if isinstance(input, binary_type): input = BytesIO(input) logger.debug('returning %s', input) return input # Used to redirect stdout to stderr. Use a value less likely to clash with # future additions to subprocess.py. STDERR = -9 # # A dummy redirects dict which indicates a desire to swap stdout and stderr # in the child. # SWAP_OUTPUTS = { 1: ('>', ('&', 2)), 2: ('>', ('&', 3)), 3: ('>', ('&', 1)), }
[docs]class Popen(subprocess.Popen): """ This is a subclass of :class:`subprocess.Popen` which is there in case we need to provide specialised functionality for use in sarge. For example, we can't do >&2 redirection in subprocess.Popen, though we can do 2>&1 """ def _get_handles(self, stdin, stdout, stderr): def close(h): if h not in (-1, None): if hasattr(h, 'Close'): h.Close() else: os.close(h) def dup(h): if hasattr(self, '_make_inheritable'): result = self._make_inheritable(h) else: result = os.dup(h) return result if stdout == STDERR and stderr == subprocess.STDOUT: # Issue #15: Following Python issue #18851, a change was made # to how Popen._get_handles works, which surfaced in Python 2.7.6. # Instead of returning a 6-tuple of handles as it did in earlier # versions, a 2-tuple is returned with the 6-tuple as first element # and a set as a second element. At this time, only Python 2 # appears to have this change. # To handle, we check for a 2-tuple return and act accordingly. PIPE = subprocess.PIPE t = super(Popen, self)._get_handles(stdin, PIPE, PIPE) # logger.debug('Base _get_handles returned %s', t) nreturned = len(t) if nreturned == 2: p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite = t[0] to_close = t[1] else: p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite = t logger.debug('swapping stdout and stderr') if nreturned == 2: # p2cread, c2pwrite and errwrite are closed in the parent. # So we should add them to to_close, as subprocess will # expect that. for h in (p2cread, c2pwrite, errwrite): if h is not None: to_close.add(h) result = (p2cread, p2cwrite, errread, errwrite, c2pread, c2pwrite), to_close else: result = p2cread, p2cwrite, errread, errwrite, c2pread, c2pwrite # logger.debug('Our _get_handles returned %s', result) return result else: orig_stdout = stdout if stdout == STDERR: stdout = None t = super(Popen, self)._get_handles(stdin, stdout, stderr) # logger.debug('Base _get_handles returned %s', t) nreturned = len(t) if nreturned == 2: p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite = t[0] to_close = t[1] else: p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite = t if orig_stdout == STDERR: # c2pread, c2pwrite are None on 2,x and -1 on 3,x close(c2pread) close(c2pwrite) c2pread = dup(errread) c2pwrite = dup(errwrite) if nreturned == 2: # p2cread, c2pwrite and errwrite are closed in the parent. # So we should add them to to_close, as subprocess will # expect that. for h in (p2cread, c2pwrite, errwrite): if h is not None: to_close.add(h) result = (p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite), to_close else: result = p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite # logger.debug('Our _get_handles returned %s', result) return result if os.name == 'posix' and sys.version_info[0] < 3: # Issue #12: add restore_signals support to avoid spurious # output on broken pipes def _execute_child(self, args, executable, preexec_fn, *rest): # can only call signal.signal in the main thread # note that this test may give the wrong answer if # called from a child process which was fork()ed # from a non-main thread, as that becomes the main # thread in the child process and will not be a _MainThread # instance. if not isinstance(threading.current_thread(), threading._MainThread): preexec = preexec_fn else: def preexec(): signal.signal(signal.SIGPIPE, signal.SIG_DFL) if preexec_fn: preexec_fn() # logger.debug('Calling Base _execute_child: %s', ((args, executable, # preexec, rest),)) super(Popen, self)._execute_child(args, executable, preexec, *rest) def __repr__(self): # pragma: no cover values = [] for attr in ('returncode', 'stdin', 'stdout', 'stderr'): values.append('%s=%s' % (attr, getattr(self, attr, None))) return '%s(%s)' % (self.__class__.__name__, ' '.join(values))
def copier(src, dest): shutil.copyfileobj(src, dest) dest.close()
[docs]class Command(object): """ This class represents a shell command to be run in a subprocess. Args: args (str|list[str]): The command string or array or command/args to be run. kwargs (dict): The same as you would pass to `subprocess.Popen`. However, the ``env`` parameter is handled differently: it is treated as a set of *additional* environment variables to be added to the values in ``os.environ``, unless the ``replace_env`` keyword argument is present and truthy, in which case the env value is used *in place of* ``os.environ``. .. versionadded:: 0.1.6 The ``replace_env`` keyword argument was added. """ def __init__(self, args, **kwargs): replace_env = kwargs.pop('replace_env', False) shell = kwargs.get('shell') if not shell and isinstance(args, string_types): args = list(shell_shlex(args, control='();>|&')) self.args = args self.kwargs = kwargs # check for input specified if kwargs.get('stdin'): raise ValueError('Inputs need to be specified via the run method.') # special handling of Capture instances in stdout, stderr for attr in ('stdout', 'stderr'): s = kwargs.get(attr) if isinstance(s, Capture): kwargs[attr] = subprocess.PIPE setattr(self, attr, s) # special handling: env is added to os.environ e = kwargs.get('env') if e: if replace_env: env = e else: env = dict(os.environ) env.update(e) kwargs['env'] = env self.process_ready = threading.Event() self.process = None self.exception = None logger.debug('%r created', self) def __repr__(self): # pragma: no cover if isinstance(self.args, basestring): s = self.args else: s = ' '.join(self.args) return '%s(%r)' % (self.__class__.__name__, s)
[docs] def run(self, input=None, async_=False): """ Run the command with optional input and either synchronously or asynchronously. Args: input (str|bytes|file): The input to pass to the command subprocess. If this is text, it is encoded to bytes using UTF-8. If it is a byte string, it is used as is. Otherwise, a file-like object containing bytes should be passed: this will be read to the end, but not closed. async_ (bool): If ``True``, this method returns without waiting for the subprocess to complete. Otherwise, it awaits completion by calling the `subprocess.Popen.wait()` method. """ # noinspection PyBroadException if input is None: self.kwargs['stdin'] = None else: input = ensure_stream(input) if not isinstance(input, BytesIO): if hasattr(input, 'fileno'): input = input.fileno() self.kwargs['stdin'] = input else: self.kwargs['stdin'] = subprocess.PIPE logger.debug('About to call Popen: %s, %s', self.args, self.kwargs) try: self.process = p = Popen(self.args, **self.kwargs) except (OSError, Exception) as e: # pragma: no cover self.process_ready.set() if isinstance(e, OSError) and e.errno == errno.ENOENT: ve = ValueError('Command not found: %s' % self.args[0]) self.exception = ve logger.exception('Popen call failed: %s: %s', type(ve), ve) raise ve logger.exception('Popen call failed: %s: %s', type(e), e) self.exception = e raise self.stdin = p.stdin logger.debug('Popen: %s, %s -> %s', self, self.kwargs, p.__dict__) if isinstance(input, BytesIO): t = threading.Thread(target=copier, args=(input, p.stdin)) t.daemon = True t.start() logger.debug('copier thread started: %s', t.name) # The thread may take a while to finish, but we want to wait # until it's on its way; otherwise, any pipeline dependent on # this input could be dead-locked. # Possibly a better mechanism than just waiting awhile is needed. # I've tried a threading.Event passed in from here which gets # set after a chunk has been written in the copier and which we # wait for here, but that doesn't seem to work reliably. t.join(0.0001) for attr in ('stdout', 'stderr'): s = getattr(self, attr, None) if isinstance(s, Capture): s.add_stream(getattr(p, attr)) self.process_ready.set() if not async_: logger.debug('about to wait for process %s', self) p.wait() logger.debug('returning %s (%s)', self, self.process) return self
[docs] def wait(self, timeout=None): """ Wait for a command's underlying sub-process to complete. Args: timeout (float): How many seconds to wait. This parameter only applies for Python >= 3.3 and has no effect otherwise. """ self.process_ready.wait() p = self.process if not p: # pragma: no cover logger.warning('No process found for %s', self) result = None else: if _wait_has_timeout: result = p.wait(timeout) else: result = p.wait() return result
[docs] def terminate(self): """ Terminate a command's underlying subprocess. .. versionadded:: 0.1.1 """ self.process_ready.wait() p = self.process if not p: # pragma: no cover raise ValueError('There is no subprocess') p.terminate()
[docs] def kill(self): """ Kill a command's underlying subprocess. .. versionadded:: 0.1.1 """ self.process_ready.wait() p = self.process if not p: # pragma: no cover raise ValueError('There is no subprocess') p.kill()
[docs] def poll(self): """ Poll a command's underlying subprocess. .. versionadded:: 0.1.1 """ self.process_ready.wait() p = self.process if not p: # pragma: no cover raise ValueError('There is no subprocess') return p.poll()
@property def returncode(self): self.process_ready.wait() return self.process.returncode if self.process else None
class Node(object): """ This class represents a node in the AST built while parsing command lines. It's basically an object container for various attributes, with a slightly specialised representation to make it a little easier to debug the parser. """ def __init__(self, **kwargs): self.__dict__.update(kwargs) def __repr__(self): # pragma: no cover chunks = [] d = dict(self.__dict__) kind = d.pop('kind') for k, v in sorted(d.items()): chunks.append('%s=%s' % (k, v)) return '%sNode(%s)' % (kind.title(), ' '.join(chunks)) class CommandLineParser(object): """ This class implements a fairly unsophisticated recursive descent parser for shell command lines as used in sh, bash and dash. On Windows, the cmd.exe command shell has limited compatibility """ permitted_tokens = ('&&', '||', '|&', '>>') def next_token(self): t = self.lex.get_token() if not t: tt = None else: tt = self.lex.token_type if tt in ('"', "'"): tt = 'word' t = t[1:-1] elif tt == 'a': try: int(t) tt = 'number' except ValueError: tt = 'word' elif tt == 'c': # the shlex parser will return arbitrary runs of 'control' # characters, but only some runs will be valid for us. We # split into the valid runs and push all the others back, # keeping just one. Since all will have a token_type of # 'c', we don't need to worry about pushing the type back. if len(t) > 1: valid = self.get_valid_controls(t) t = valid.pop(0) if valid: for other in reversed(valid): self.lex.push_token(other) tt = t return tt, t, self.lex.preceding def get_valid_controls(self, t): if len(t) == 1: result = [t] else: result = [] last = None for c in t: if last is not None: combined = last + c if combined in self.permitted_tokens: result.append(combined) else: result.append(last) result.append(c) last = None elif c not in ('>', '&', '|'): result.append(c) else: last = c if last: result.append(last) # logger.debug('%s -> %s', t, result) return result def peek_token(self): if self.peek is None: self.peek = self.next_token() return self.peek[0] def consume(self, tt): self.token = self.peek self.peek = self.next_token() if self.token[0] != tt: raise ValueError('consume: expected %r', tt) def parse(self, source, posix=None): self.source = source parse_logger.debug('starting parse of %r', source) if posix is None: posix = os.name == 'posix' self.lex = shell_shlex(source, posix=posix, control=True) self.token = None self.peek = None self.peek_token() result = self.parse_list() return result def parse_list(self): parts = [self.parse_pipeline()] tt = self.peek_token() while tt in (';', '&'): self.consume(tt) part = self.parse_pipeline() parts.append(Node(kind='sync', sync=tt)) parts.append(part) tt = self.peek_token() if len(parts) == 1: node = parts[0] else: node = Node(kind='list', parts=parts) parse_logger.debug('returning %r', node) return node def parse_pipeline(self): parts = [self.parse_logical()] tt = self.peek_token() while tt in ('&&', '||'): self.consume(tt) part = self.parse_logical() parts.append(Node(kind='check', check=tt)) parts.append(part) tt = self.peek_token() if len(parts) == 1: node = parts[0] else: node = Node(kind='pipeline', parts=parts) parse_logger.debug('returning %r', node) return node def parse_logical(self): tt = self.peek_token() if tt == '(': self.consume(tt) node = self.parse_list() self.consume(')') else: parts = [self.parse_command()] tt = self.peek_token() while tt in ('|', '|&'): last_part = parts[-1] if ((tt == '|' and 1 in last_part.redirects) or (tt == '|&' and 2 in last_part.redirects)): if last_part.redirects != SWAP_OUTPUTS: raise ValueError('semantics: cannot redirect and pipe the ' 'same stream') self.consume(tt) part = self.parse_command() parts.append(Node(kind='pipe', pipe=tt)) parts.append(part) tt = self.peek_token() if len(parts) == 1: node = parts[0] else: node = Node(kind='logical', parts=parts) parse_logger.debug('returning %r', node) return node def add_redirection(self, node, fd, kind, dest): if fd in node.redirects: raise ValueError('semantics: cannot redirect stream %d twice' % fd) node.redirects[fd] = (kind, dest) def parse_command(self): node = self.parse_command_part() tt = self.peek_token() while tt in ('word', 'number'): part = self.parse_command_part() node.command.extend(part.command) for fd, v in part.redirects.items(): self.add_redirection(node, fd, v[0], v[1]) tt = self.peek_token() parse_logger.debug('returning %r', node) if node.redirects != SWAP_OUTPUTS: d = dict(node.redirects) d.pop(1, None) d.pop(2, None) if d: raise ValueError('semantics: can only redirect stdout and ' 'stderr, not %s' % list(d.keys())) if sys.platform == 'win32': # pragma: no cover from .utils import find_command cmd = find_command(node.command[0]) if cmd: exe, cmd = cmd node.command[0] = cmd if exe: node.command.insert(0, exe) return node def parse_command_part(self): node = Node(kind='command', command=[self.peek[1]], redirects={}) if self.peek[0] == 'word': self.consume('word') else: self.consume('number') tt = self.peek_token() while tt in ('>', '>>'): num = 1 # default value if self.peek[2] == '': # > or >> seen without preceding whitespace. So see if the # last token is a positive integer. If it is, assume it's # an fd to redirect and pop it, else leave it in as part of # the command line. try: try_num = int(node.command[-1]) if try_num > 0: num = try_num node.command.pop() except ValueError: pass redirect_kind = tt self.consume(tt) tt = self.peek_token() if tt not in ('word', '&'): raise ValueError('syntax: expecting filename or &') if tt == 'word': redirect_target = self.peek[1] self.consume(tt) else: self.consume('&') if self.peek_token() != 'number': raise ValueError('syntax: number expected after &') n = int(self.peek[1]) redirect_target = ('&', n) self.consume('number') self.add_redirection(node, num, redirect_kind, redirect_target) tt = self.peek_token() parse_logger.debug('returning %r', node) return node
[docs]class Pipeline(WithMixin): """ This class represents a pipeline of commands. """ def __init__(self, source, posix=None, **kwargs): """ Initialize a new instance. Args: source (str|list|tuple): The command line. posix (bool): Whether POSIX conventions are used in the lexer. kwargs (dict): Whatever you might pass to `subprocess.Popen`. """ if posix is None: posix = os.name == 'posix' is_shell = kwargs.get('shell', False) if isinstance(source, (list, tuple)) or is_shell: if is_shell: self.source = source else: self.source = ' '.join(source) t = Node(kind='command', command=source, redirects={}) else: self.source = source t = CommandLineParser().parse(source, posix=posix) logger.debug('command tree: %s', t) self.tree = t self.last = self.find_last_command(t) self.events = [] self.kwargs = kwargs self.stdout = kwargs.pop('stdout', None) self.stderr = kwargs.pop('stderr', None) self.lock = threading.RLock() self.commands = [] def find_last_command(self, node): """ Find the last command node in a parse sub-tree. Args: node (Node): The root of the sub-tree to search. """ if not hasattr(node, 'parts'): result = node else: result = self.find_last_command(node.parts[-1]) assert result.kind == 'command' return result def run_node_in_thread(self, node, input, async_): """ Run a node in a separate thread. A thread is created and the `run_node()` method is run with the specified arguments in that thread. Args: node (Node): The node to run. input (str|bytes): The data to pass to the node's command. async_ (bool): This is passed to `run_node()`. """ # When the node is run in a separate thread, we need # a sync point for when all the commands have been created # for that node - even when there are delays because of e.g. # sleep commands or other time-consuming commands. That's # what these events are for - they're set at the end of # run_node, and waited on in the pipeline's wait and run # methods. e = threading.Event() with self.lock: self.events.append(e) t = threading.Thread(target=self.run_node, args=(node, input, async_, e)) t.daemon = True logger.debug('thread %s started to run node: %s', t.name, node) t.start()
[docs] def run(self, input=None, async_=False): """ Run the commands in the pipeline. Args: input (str|bytes|file): The data to pass to the command. async_ (bool): If `True`, don't wait for the pipeline to complete before returning. """ self.commands = [] self.opened = [] node = self.tree # Issue #20: run in thread if async if async_: self.run_node_in_thread(node, input, async_=True) else: self.run_node(node, input=input, async_=False) return self
@property def returncode(self): """ The return code of the last command to run, which is regarded as the overall result of the pipeline. """ if self.commands: return self.commands[-1].process.returncode @property def processes(self): """ A list of the :class:`subprocess.Popen` instances for all the commands which have been run. """ result = [] if self.commands: result = [c.process for c in self.commands] return result @property def returncodes(self): """ A list of the return codes for all the commands which have been run. """ result = [] if self.commands: for c in self.commands: rc = None if c.process: rc = c.process.poll() result.append(rc) return result @property def exceptions(self): """ A list of any exceptions raised by commands which have been run. """ result = [c.exception for c in self.commands if c] return result def wait_events(self): """ Wait for all the events in the pipeline to be set """ for e in self.events: e.wait()
[docs] def wait(self, timeout=None): """ Wait for all the commands in the pipeline to complete. Args: timeout (float): The timeout in seconds. This parameter only applies for Python >= 3.3 and has no effect otherwise. It will be applied to each command in turn, so the effect could be cumulative. """ logger.debug('pipeline waiting') self.wait_events() for cmd in self.commands: logger.debug('waiting for command %s', cmd) cmd.wait(timeout)
[docs] def close(self): """ Close the pipeline. This waits for all the commands in the pipeline to complete, but also closes all the opened streams once all the commands have completed. """ logger.debug('pipeline closing') self.wait_events() for cmd in self.commands: cmd.wait() p = cmd.process if p is None: # pragma: no cover continue for attr in ('stdout', 'stderr'): s = getattr(self, attr) if isinstance(s, Capture): s.close() if p.stdout: p.stdout.close() if p.stderr: p.stderr.close() for stream in self.opened: stream.close()
[docs] def poll_last(self): """ Check if the last command to run has terminated, and return its exit code, if available. """ if self.commands: return self.commands[-1].poll()
[docs] def poll_all(self): """ Check if all commands to run have terminated. Return a list of exit codes, where available. """ result = [] if self.commands: result = [c.poll() for c in self.commands] return result
def run_node(self, node, input, async_, event=None): """ This runs a single node in the parse tree. Args: node (Node): The node to run. input (str|bytes|file): The data to pass to the command. async_ (bool): If `True`, don't wait for the pipeline to complete before returning. event (threading.Event): If specified, call `threading.Event.set()` on the event. """ kind = node.kind method = 'run_%s_node' % kind logger.debug('%s %s %s', method, async_, event) try: result = getattr(self, method)(node, input, async_) return result except Exception as e: logger.exception('Failed: %s', e) node.exception = e raise finally: if event: event.set() def new_command(self, args, **kwargs): """ Create a new `Command` from the provided arguments, and append it to the list of commands. Args: args (list[str]): The command and arguments to be created. """ cmd = Command(args, **kwargs) with self.lock: self.commands.append(cmd) return cmd def get_redirects(self, node): """ Get the redirects for a node, if any. Args: node (Node): An AST node from the parser. Returns: tuple: The ``stdout`` and ``stderr`` redirect targets. If either of these is not specified, the corresponding value in the result will be ``None``. """ stdout = stderr = None for fd, fs in node.redirects.items(): pos, fn = fs if pos == '>': mode = 'wb' else: mode = 'ab' if isinstance(fn, string_types): # Issue 9: open redirection outputs relative to cwd if 'cwd' in self.kwargs: fn = os.path.join(self.kwargs['cwd'], fn) stream = open(fn, mode) with self.lock: self.opened.append(stream) elif fd == 1: assert fn == ('&', 2) stream = STDERR elif fd == 2: assert fn == ('&', 1) stream = subprocess.STDOUT if fd == 1: stdout = stream else: stderr = stream return stdout, stderr def run_logical_node(self, node, input, async_): """ This runs a 'logical' node in the parse tree. Args: node (Node): The node to run. input (str|bytes|file): The data to pass to the command. Text will be encoded using UTF-8. async_ (bool): If `True`, don't wait for the pipeline to complete before returning. """ logger.debug('started: %s, %s, %s', node, input, async_) parts = node.parts last = len(parts) - 1 assert last > 1 prev = pipe = None i = 0 while i <= last: curr = parts[i] if prev is None: if not input: stdin = None else: stdin = ensure_stream(input) else: if pipe == '|': stdin = prev.process.stdout else: stdin = prev.process.stderr if curr.redirects == SWAP_OUTPUTS: stdout = STDERR stderr = subprocess.STDOUT else: try: stdout, stderr = self.get_redirects(curr) except IOError: # pragma: no cover if prev and stdin == prev.process.stdout: stdin.close() raise if i < last: pipe = parts[i + 1].pipe if pipe == '|': assert stdout in (None, STDERR) if stdout is None: stdout = subprocess.PIPE else: assert stderr in (None, subprocess.STDOUT) if stderr is None: stderr = subprocess.PIPE use_async = True else: if stdout == STDERR: assert self.stdout is None use_async = async_ curr.cmd = self.new_command(curr.command, stdout=stdout or self.stdout, stderr=stderr or self.stderr, **self.kwargs) curr.cmd.run(input=stdin, async_=use_async) # Issue 12: close stdin after spawning the child that uses it if prev and stdin == prev.process.stdout: stdin.close() prev = curr.cmd i += 2 def run_command_node(self, node, input, async_): """ This runs a 'command' node in the parse tree. Args: node (Node): The node to run. input (str|bytes|file): The data to pass to the command. async_ (bool): If `True`, don't wait for the pipeline to complete before returning. """ logger.debug('started: %s, %s, %s', node, input, async_) kwargs = dict(self.kwargs) stdout, stderr = self.get_redirects(node) if node != self.last: kwargs['stdout'] = stdout or self.stdout kwargs['stderr'] = stderr or self.stderr else: if self.stdout and stdout: raise ValueError('You cannot redirect one stream to two ' 'places') kwargs['stdout'] = self.stdout or stdout if self.stderr and stderr: raise ValueError('You cannot redirect one stream to two ' 'places') kwargs['stderr'] = self.stderr or stderr node.cmd = self.new_command(node.command, **kwargs) try: node.cmd.run(input=input, async_=async_) except Exception as e: from .utils import is_main_thread if is_main_thread(): raise # if not the main thread, then the exception should have been stored in # node, so just do nothing more assert node.cmd.exception is not None def get_status(self, node): """ Get the return code for a node. For a node with multiple commands, the return code of the last command (as determined by `find_last_command()` is returned. Args: node (Node): The node to query. Returns: int|None: The return code for the node. """ if node.kind == 'command': last = node else: last = self.find_last_command(node) return last.cmd.process.returncode def run_pipeline_node(self, node, input, async_): """ This runs a 'pipeline' node in the parse tree. Args: node (Node): The node to run. input (str|bytes|file): The data to pass to the command. async_(bool): If `True`, don't wait for the pipeline to complete before returning. """ logger.debug('started: %s, %s, %s', node, input, async_) parts = node.parts last = len(parts) - 1 assert last > 1 prev = None i = 0 while i <= last: curr = parts[i] if prev is not None: input = None elif input is not None: input = ensure_stream(input) # run the current command if i < last: # need to know status, so run with async_=False use_async = False else: use_async = async_ self.run_node(curr, input, async_=use_async) if i < last: check = parts[i + 1].check if check == '&&': if self.get_status(curr) != 0: break else: if self.get_status(curr) == 0: break prev = curr i += 2 def run_list_node(self, node, input, async_): """ This runs a 'list' node in the parse tree. Args: node (Node): The node to run. input (str|bytes|file): The data to pass to the command. async_(bool): If `True`, don't wait for the pipeline to complete before returning. """ logger.debug('started: %s, %s, %s', node, input, async_) parts = node.parts last = len(parts) - 1 assert last > 1 prev = None i = 0 while i <= last: curr = parts[i] if prev is not None: input = None elif input is not None: input = ensure_stream(input) if i < last: use_async = parts[i + 1].sync == '&' else: use_async = async_ # run the current command if not use_async: self.run_node(curr, input, async_=use_async) else: self.run_node_in_thread(curr, input, async_=False) prev = curr i += 2
# Module-level convenience functions
[docs]def run(cmd, **kwargs): """ Run a command with optional input and either synchronously or asynchronously. Apart from the ``input`` and ``async_`` keyword arguments described below, other keyword arguments are passed to the created :class:`Pipeline` instance, and thence to :class:`subprocess.Popen` via a :class:`Command` instance. Note that the ``env`` kwarg is treated differently to how it is in :class:`~subprocess.Popen`: it is treated as a set of *additional* environment variables to be added to the values in ``os.environ``. Args: cmd (str|list[str]): The command string or array or command/args to be run. input (str|bytes|file): The input to pass to the command subprocess. async_ (bool): If `True`, this method returns without waiting for the subprocess to complete. Otherwise, it awaits completion by calling the `subprocess.Popen.wait()` method. """ input = kwargs.pop('input', None) async_ = kwargs.pop('async_', False) if async_: p = Pipeline(cmd, **kwargs) p.run(input=input, async_=True) else: with Pipeline(cmd, **kwargs) as p: p.run(input=input, async_=async_) return p
[docs]def capture_stdout(cmd, **kwargs): """ This is the same as `run()`, but the ``stdout`` is captured. You can access this via the ``stdout`` attribute of the return value from this function. Args: cmd (str|list[str]): The command string or array or command/args to be run. input (str|bytes|file): The input to pass to the command subprocess. async_ (bool): If `True`, this method returns without waiting for the subprocess to complete. Otherwise, it awaits completion by calling the `subprocess.Popen.wait()` method. """ kwargs['stdout'] = Capture() return run(cmd, **kwargs)
[docs]def capture_stderr(cmd, **kwargs): """ This is the same as `run()`, but the ``stderr`` is captured. You can access this via the ``stderr`` attribute of the return value from this function. Args: cmd (str|list[str]): The command string or array or command/args to be run. input (str|bytes|file): The input to pass to the command subprocess. async_ (bool): If `True`, this method returns without waiting for the subprocess to complete. Otherwise, it awaits completion by calling the `subprocess.Popen.wait()` method. """ kwargs['stderr'] = Capture() return run(cmd, **kwargs)
[docs]def capture_both(cmd, **kwargs): """ This is the same as `run()`, but the ``stdout`` and ``stderr`` are both captured. You can access these via the ``stdout`` and ``stderr`` attributes of the return value from this function. Args: cmd (str|list[str]): The command string or array or command/args to be run. input (str|bytes|file): The input to pass to the command subprocess. async_ (bool): If `True`, this method returns without waiting for the subprocess to complete. Otherwise, it awaits completion by calling the `subprocess.Popen.wait()` method. """ kwargs['stdout'] = Capture() kwargs['stderr'] = Capture() return run(cmd, **kwargs)
[docs]def get_stdout(cmd, **kwargs): """ This is the same as `capture_stdout()`, but it returns the captured text. Use this when you know the output will not be voluminous - it will be buffered in memory. """ p = capture_stdout(cmd, **kwargs) return p.stdout.text
[docs]def get_stderr(cmd, **kwargs): """ This is the same as `capture_stderr()`, but it returns the captured text. Use this when you know the output will not be voluminous - it will be buffered in memory. """ p = capture_stderr(cmd, **kwargs) return p.stderr.text
[docs]def get_both(cmd, **kwargs): """ This is the same as `capture_both()`, but it returns the captured text from the two streams as a 2-element tuple, with the ``stdout`` text as the first element and the ``stderr`` text as the second. Use this when you know the output will not be voluminous - it will be buffered in memory. """ p = capture_both(cmd, **kwargs) return p.stdout.text, p.stderr.text
def parse_command_line(source, posix=None): """ Parse a command line into an AST. Args: source (str): The command line to parse. posix (bool): Whether POSIX conventions are used in the lexer. """ if posix is None: posix = os.name == 'posix' return CommandLineParser().parse(source, posix=posix)