# -*- coding: utf-8 -*-
#
# Copyright (C) 2012-2022 Vinay M. Sajip. See LICENSE for licensing information.
#
# sarge: Subprocess Allegedly Rewards Good Encapsulation :-)
#
import errno
from io import BytesIO
import logging
import os
try:
import queue
except ImportError: # pragma: no cover
import Queue as queue
import re
import shutil
import signal
import string
import subprocess
import sys
import threading
try:
from logging import NullHandler
except ImportError: # pragma: no cover
class NullHandler(logging.Handler):
def handle(self, record):
pass
emit = handle
def createLock(self):
self.lock = None
from .shlext import shell_shlex
__all__ = ('shell_quote', 'Capture', 'Command', 'ShellFormatter', 'Pipeline', 'Feeder',
'shell_format', 'run', 'parse_command_line', 'capture_stdout',
'capture_stderr', 'capture_both', 'get_stdout', 'get_stderr', 'get_both')
__version__ = '0.1.8.dev0'
__date__ = '2021-12-11'
logger = logging.getLogger(__name__)
logger.addHandler(NullHandler())
del NullHandler
# We use a separate logger for parsing, as that's sometimes too much
# information :-)
parse_logger = logging.getLogger('%s.parse' % __name__)
#
# This runs on Python 2.x and 3.x from the same code base - no need for 2to3.
#
if sys.version_info[0] < 3: # pragma: no cover
PY3 = False
text_type = unicode
binary_type = str
string_types = basestring,
_wait_has_timeout = False
else: # pragma: no cover
PY3 = True
text_type = str
binary_type = bytes
string_types = str,
basestring = str
_wait_has_timeout = sys.version_info[:2] >= (3, 3)
# This regex determines which shell input needs quoting
# because it may be unsafe
UNSAFE = re.compile(r'[^\w%+,./:=@-]')
[docs]def shell_quote(s):
"""
Quote text so that it is safe for Posix command shells.
For example, "*.py" would be converted to "'*.py'". If the text is
considered safe it is returned unquoted.
Args:
s (str): The value to quote
Returns:
str: A safe version of the input, from the point of view of POSIX
command shells
"""
assert isinstance(s, string_types)
if not s:
result = "''"
elif not UNSAFE.search(s):
result = s
else:
result = "'%s'" % s.replace("'", r"'\''")
return result
class ShellFormatter(string.Formatter):
"""
This class overrides :class:`string.Formatter` to provide a custom
`convert_field()` method, which ensures that fields are quoted for
safety using `shell_quote()`.
"""
def convert_field(self, value, conversion):
"""
Convert a field to text.
If a conversion is specified (e.g. !s, !r), no quoting is performed.
If *no* conversion is specified, the value is converted to string
(using `str()`) and that value is quoted using `shell_quote()`
before being returned.
Args:
value (Any): The value to be converted.
conversion (str|None): The conversion to apply.
Returns:
str: The converted value.
"""
if conversion is None:
result = shell_quote(str(value))
else:
result = super(ShellFormatter, self).convert_field(value, conversion)
return result
class WithMixin(object):
"""
This class provides a very simple mixin for objects which can be used
in a ``with`` statement.
"""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
default_capture_timeout = 0.02
default_expect_timeout = 5.0
[docs]class Capture(WithMixin):
"""
This class encapsulates an output stream of a sub-process. You just set
``stdout`` or ``stderr`` of a :class:`Command` or :class:`Pipeline` to an
instance of this class.
"""
counter = 1
# These are needed to allow wrapping using TextIOWrapper
readable = lambda self: True
writable = lambda self: False
seekable = readable
closed = False
def __init__(self, timeout=None, buffer_size=-1, encoding='utf-8'):
"""
Create a new instance.
Args:
timeout (float): The timeout to use for this instance. If not specified,
the module attribute ``default_capture_timeout` is used.
buffer_size (int): The buffer size to use when reading from streams.
If not specified, a 4K buffer is used.
encoding (str): The encoding to use.
"""
self.timeout = timeout or default_capture_timeout
self.streams = []
self.buffer = queue.Queue()
self.buffer_size = buffer_size or 4096
self.encoding = encoding
self.current = None
self._bytes = None
self.threads = []
self.pattern = None
self.matched = threading.Event()
self.match = None
self.match_index = 0
self.counter = self.__class__.counter
self.__class__.counter += 1
def add_stream(self, stream):
"""
Add a stream to this instance. A new thread is spawned to read from
the stream into the capture queue for this instance.
Args:
stream (file): An output stream from a child process (i.e. the read
end of a pipe, whose write end is the output stream
from the process.
"""
self.streams.append(stream)
ready = threading.Event()
t = threading.Thread(target=self.reader, args=(stream, ready))
logger.debug('Created thread %s as reader for %r', t.name, self)
self.threads.append(t)
t.daemon = True
t.start()
logger.debug('%r: reader thread kicked off, waiting start', self)
ready.wait()
logger.debug('%r: reader thread now started', self)
def reader(self, stream, ready):
"""
The callable used as the runnable in reader threads.
Args:
stream (file): The stream to read.
ready (threading.Event): An Event instance to set when the
reader thread starts executing.
"""
ready.set()
chunk_size = self.buffer_size
if chunk_size > 0:
logger.debug('%r: reader thread about to read %s', self, chunk_size)
else:
logger.debug('%r: reader thread about to read line', self)
self._done = False
while not self._done:
if chunk_size < 0:
chunk = stream.readline()
else:
chunk = stream.read(chunk_size)
if chunk:
self.buffer.put_nowait(chunk)
logger.debug('queued chunk of length %d to %s: %r', len(chunk), self.buffer,
chunk[:30])
if self.pattern and not self.matched.is_set():
self._try_match()
if chunk_size > 0:
if len(chunk) < chunk_size:
break
else:
if not chunk:
break
logger.debug('%r: finished reading stream %s', self, stream)
stream.close()
@property
def bytes(self):
"""
All the bytes in the capture queue.
"""
data = self.read()
if self._bytes is not None:
data = self._bytes + data
self._bytes = data
return data
@property
def text(self):
"""
All the bytes in the capture queue, decoded as text.
"""
return self.bytes.decode(self.encoding)
def streams_open(self):
result = False
for c in self.streams:
if not c.closed:
result = True
break
return result
[docs] def read(self, size=-1, block=True, timeout=None):
"""
Read some bytes from this instance.
Args:
size (int): The number of bytes to read. If less than zero, the
reading continues until there is no more data.
block (bool): If ``True``, don't return until the required bytes
are available.
timeout (float): The timeout in seconds.
Returns:
bytes: The bytes requested.
"""
if not self.streams_open():
block = False
timeout = None
else:
timeout = timeout or self.timeout
if size <= 0:
b = []
while True:
try:
b.append(self.buffer.get(block, timeout))
except queue.Empty:
break
result = b''.join(b)
else:
if self.current is None:
try:
self.current = self.buffer.get(block, timeout)
except queue.Empty:
self.current = b''
while not self.current:
try:
self.current += self.buffer.get(block, timeout)
except queue.Empty:
break
if len(self.current) <= size:
result = self.current
self.current = None
else:
result = self.current[:size]
self.current = self.current[size:]
return result
def read1(self, n):
return self.read(n)
[docs] def readline(self, size=-1, block=True, timeout=None):
"""
Read a line from this instance, optionally up to a certain size.
Args:
size (int): If specified as greater than zero, this many bytes
are returned even if not a complete line.
block (bool): If ``True``, don't return until the required bytes
are available.
timeout (float): The timeout in seconds.
"""
if not self.streams_open():
block = False
timeout = None
else:
timeout = timeout or self.timeout
if self.current is None:
try:
self.current = self.buffer.get(block, timeout)
except queue.Empty:
self.current = b''
while b'\n' not in self.current:
try:
self.current += self.buffer.get(block, timeout)
except queue.Empty:
break
if b'\n' not in self.current:
result = self.current
self.current = None
else:
i = self.current.index(b'\n')
if 0 < size < i:
i = size - 1
result = self.current[:i + 1]
self.current = self.current[i + 1:]
return result
[docs] def readlines(self, sizehint=-1, block=True, timeout=None):
"""
Read multiple lines from this instance.
Args:
sizehint (int): A suggested number of bytes to read.
block (bool): If ``True``, don't return until the required bytes
are available.
timeout (float): The timeout in seconds.
"""
if not self.streams_open():
block = False
timeout = None
else:
timeout = timeout or self.timeout
data = self.read(sizehint, block, timeout)
if self.current is not None:
data = self.current + data
self.current = None
return data.splitlines(True)
def _try_match(self):
data = self.bytes
if data and self.pattern:
m = self.pattern.search(data, self.match_index)
if m:
logger.debug('Found at %s after %d bytes', m.span(), len(data))
self.match = m
self.match_index = m.end()
self.matched.set()
[docs] def expect(self, pattern, timeout=None):
"""
Wait for a pattern to appear.
Args:
pattern (str|bytes): The pattern to wait for.
timeout (float): The timeout in seconds.
Returns:
bytes: The matching input.
"""
def as_pattern(p):
if isinstance(p, string_types):
if isinstance(p, text_type):
p = p.encode('utf-8')
p = re.compile(p, re.MULTILINE)
return p
self.pattern = as_pattern(pattern)
self.matched.clear()
self.match = None
self._try_match()
if not self.match:
if timeout is None:
timeout = default_expect_timeout
self.matched.wait(timeout)
return self.match
def __iter__(self):
while True:
line = self.readline()
if not line:
break
yield line
def close(self, stop_threads=False):
"""
Close this instance. Once this is done, no attempts should be made to read from it.
We don't mark it as closed, so that a TextIOWrapper wrapping it won't fail
before all the input has been read.
Args:
stop_threads (bool): If ``True``, the threads populating this instance are
asked to stop.
"""
if stop_threads:
self._done = True # may lose some data sent from subprocess
for t in self.threads:
try:
t.join()
except RuntimeError: # pragma: no cover
logger.debug('failed to join thread: %s', t)
# raise
def __repr__(self): # pragma: no cover
return '%s-%d' % (self.__class__.__name__, self.counter)
class Feeder(object):
"""
Facilitate sending data to a child process over time rather than
just when the child is spawned.
"""
def __init__(self):
self._r, self._w = os.pipe()
def fileno(self):
return self._r
def feed(self, data):
if isinstance(data, text_type):
data = data.encode('utf-8')
if not isinstance(data, bytes):
raise TypeError('Bytes expected, got %s' % type(data))
os.write(self._w, data)
def close(self):
if self._r:
os.close(self._r)
self._r = None
if self._w:
os.close(self._w)
self._w = None
def ensure_stream(input, encoding='utf-8'):
"""
Convert a possible text value into a binary file-like object.
"""
if isinstance(input, text_type):
input = input.encode(encoding) # need to be explicit for 2.x!
if isinstance(input, binary_type):
input = BytesIO(input)
logger.debug('returning %s', input)
return input
# Used to redirect stdout to stderr. Use a value less likely to clash with
# future additions to subprocess.py.
STDERR = -9
#
# A dummy redirects dict which indicates a desire to swap stdout and stderr
# in the child.
#
SWAP_OUTPUTS = {
1: ('>', ('&', 2)),
2: ('>', ('&', 3)),
3: ('>', ('&', 1)),
}
[docs]class Popen(subprocess.Popen):
"""
This is a subclass of :class:`subprocess.Popen` which is there in case we
need to provide specialised functionality for use in sarge. For example,
we can't do >&2 redirection in subprocess.Popen, though we can do 2>&1
"""
def _get_handles(self, stdin, stdout, stderr):
def close(h):
if h not in (-1, None):
if hasattr(h, 'Close'):
h.Close()
else:
os.close(h)
def dup(h):
if hasattr(self, '_make_inheritable'):
result = self._make_inheritable(h)
else:
result = os.dup(h)
return result
if stdout == STDERR and stderr == subprocess.STDOUT:
# Issue #15: Following Python issue #18851, a change was made
# to how Popen._get_handles works, which surfaced in Python 2.7.6.
# Instead of returning a 6-tuple of handles as it did in earlier
# versions, a 2-tuple is returned with the 6-tuple as first element
# and a set as a second element. At this time, only Python 2
# appears to have this change.
# To handle, we check for a 2-tuple return and act accordingly.
PIPE = subprocess.PIPE
t = super(Popen, self)._get_handles(stdin, PIPE, PIPE)
# logger.debug('Base _get_handles returned %s', t)
nreturned = len(t)
if nreturned == 2:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite = t[0]
to_close = t[1]
else:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite = t
logger.debug('swapping stdout and stderr')
if nreturned == 2:
# p2cread, c2pwrite and errwrite are closed in the parent.
# So we should add them to to_close, as subprocess will
# expect that.
for h in (p2cread, c2pwrite, errwrite):
if h is not None:
to_close.add(h)
result = (p2cread, p2cwrite, errread, errwrite, c2pread, c2pwrite), to_close
else:
result = p2cread, p2cwrite, errread, errwrite, c2pread, c2pwrite
# logger.debug('Our _get_handles returned %s', result)
return result
else:
orig_stdout = stdout
if stdout == STDERR:
stdout = None
t = super(Popen, self)._get_handles(stdin, stdout, stderr)
# logger.debug('Base _get_handles returned %s', t)
nreturned = len(t)
if nreturned == 2:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite = t[0]
to_close = t[1]
else:
p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite = t
if orig_stdout == STDERR:
# c2pread, c2pwrite are None on 2,x and -1 on 3,x
close(c2pread)
close(c2pwrite)
c2pread = dup(errread)
c2pwrite = dup(errwrite)
if nreturned == 2:
# p2cread, c2pwrite and errwrite are closed in the parent.
# So we should add them to to_close, as subprocess will
# expect that.
for h in (p2cread, c2pwrite, errwrite):
if h is not None:
to_close.add(h)
result = (p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite), to_close
else:
result = p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
# logger.debug('Our _get_handles returned %s', result)
return result
if os.name == 'posix' and sys.version_info[0] < 3:
# Issue #12: add restore_signals support to avoid spurious
# output on broken pipes
def _execute_child(self, args, executable, preexec_fn, *rest):
# can only call signal.signal in the main thread
# note that this test may give the wrong answer if
# called from a child process which was fork()ed
# from a non-main thread, as that becomes the main
# thread in the child process and will not be a _MainThread
# instance.
if not isinstance(threading.current_thread(), threading._MainThread):
preexec = preexec_fn
else:
def preexec():
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
if preexec_fn:
preexec_fn()
# logger.debug('Calling Base _execute_child: %s', ((args, executable,
# preexec, rest),))
super(Popen, self)._execute_child(args, executable, preexec, *rest)
def __repr__(self): # pragma: no cover
values = []
for attr in ('returncode', 'stdin', 'stdout', 'stderr'):
values.append('%s=%s' % (attr, getattr(self, attr, None)))
return '%s(%s)' % (self.__class__.__name__, ' '.join(values))
def copier(src, dest):
shutil.copyfileobj(src, dest)
dest.close()
[docs]class Command(object):
"""
This class represents a shell command to be run in a subprocess.
Args:
args (str|list[str]): The command string or array or command/args to be run.
kwargs (dict): The same as you would pass to `subprocess.Popen`.
However, the ``env`` parameter is handled differently: it
is treated as a set of *additional* environment variables
to be added to the values in ``os.environ``, unless the
``replace_env`` keyword argument is present and truthy, in
which case the env value is used *in place of*
``os.environ``.
.. versionadded:: 0.1.6
The ``replace_env`` keyword argument was added.
"""
def __init__(self, args, **kwargs):
replace_env = kwargs.pop('replace_env', False)
shell = kwargs.get('shell')
if not shell and isinstance(args, string_types):
args = list(shell_shlex(args, control='();>|&'))
self.args = args
self.kwargs = kwargs
# check for input specified
if kwargs.get('stdin'):
raise ValueError('Inputs need to be specified via the run method.')
# special handling of Capture instances in stdout, stderr
for attr in ('stdout', 'stderr'):
s = kwargs.get(attr)
if isinstance(s, Capture):
kwargs[attr] = subprocess.PIPE
setattr(self, attr, s)
# special handling: env is added to os.environ
e = kwargs.get('env')
if e:
if replace_env:
env = e
else:
env = dict(os.environ)
env.update(e)
kwargs['env'] = env
self.process_ready = threading.Event()
self.process = None
self.exception = None
logger.debug('%r created', self)
def __repr__(self): # pragma: no cover
if isinstance(self.args, basestring):
s = self.args
else:
s = ' '.join(self.args)
return '%s(%r)' % (self.__class__.__name__, s)
[docs] def run(self, input=None, async_=False):
"""
Run the command with optional input and either synchronously or
asynchronously.
Args:
input (str|bytes|file): The input to pass to the command subprocess.
If this is text, it is encoded to bytes using UTF-8.
If it is a byte string, it is used as is. Otherwise, a
file-like object containing bytes should be passed: this
will be read to the end, but not closed.
async_ (bool): If ``True``, this method returns without waiting for the
subprocess to complete. Otherwise, it awaits completion
by calling the `subprocess.Popen.wait()` method.
"""
# noinspection PyBroadException
if input is None:
self.kwargs['stdin'] = None
else:
input = ensure_stream(input)
if not isinstance(input, BytesIO):
if hasattr(input, 'fileno'):
input = input.fileno()
self.kwargs['stdin'] = input
else:
self.kwargs['stdin'] = subprocess.PIPE
logger.debug('About to call Popen: %s, %s', self.args, self.kwargs)
try:
self.process = p = Popen(self.args, **self.kwargs)
except (OSError, Exception) as e: # pragma: no cover
self.process_ready.set()
if isinstance(e, OSError) and e.errno == errno.ENOENT:
ve = ValueError('Command not found: %s' % self.args[0])
self.exception = ve
logger.exception('Popen call failed: %s: %s', type(ve), ve)
raise ve
logger.exception('Popen call failed: %s: %s', type(e), e)
self.exception = e
raise
self.stdin = p.stdin
logger.debug('Popen: %s, %s -> %s', self, self.kwargs, p.__dict__)
if isinstance(input, BytesIO):
t = threading.Thread(target=copier, args=(input, p.stdin))
t.daemon = True
t.start()
logger.debug('copier thread started: %s', t.name)
# The thread may take a while to finish, but we want to wait
# until it's on its way; otherwise, any pipeline dependent on
# this input could be dead-locked.
# Possibly a better mechanism than just waiting awhile is needed.
# I've tried a threading.Event passed in from here which gets
# set after a chunk has been written in the copier and which we
# wait for here, but that doesn't seem to work reliably.
t.join(0.0001)
for attr in ('stdout', 'stderr'):
s = getattr(self, attr, None)
if isinstance(s, Capture):
s.add_stream(getattr(p, attr))
self.process_ready.set()
if not async_:
logger.debug('about to wait for process %s', self)
p.wait()
logger.debug('returning %s (%s)', self, self.process)
return self
[docs] def wait(self, timeout=None):
"""
Wait for a command's underlying sub-process to complete.
Args:
timeout (float): How many seconds to wait.
This parameter only applies for
Python >= 3.3 and has no effect otherwise.
"""
self.process_ready.wait()
p = self.process
if not p: # pragma: no cover
logger.warning('No process found for %s', self)
result = None
else:
if _wait_has_timeout:
result = p.wait(timeout)
else:
result = p.wait()
return result
[docs] def terminate(self):
"""
Terminate a command's underlying subprocess.
.. versionadded:: 0.1.1
"""
self.process_ready.wait()
p = self.process
if not p: # pragma: no cover
raise ValueError('There is no subprocess')
p.terminate()
[docs] def kill(self):
"""
Kill a command's underlying subprocess.
.. versionadded:: 0.1.1
"""
self.process_ready.wait()
p = self.process
if not p: # pragma: no cover
raise ValueError('There is no subprocess')
p.kill()
[docs] def poll(self):
"""
Poll a command's underlying subprocess.
.. versionadded:: 0.1.1
"""
self.process_ready.wait()
p = self.process
if not p: # pragma: no cover
raise ValueError('There is no subprocess')
return p.poll()
@property
def returncode(self):
self.process_ready.wait()
return self.process.returncode if self.process else None
class Node(object):
"""
This class represents a node in the AST built while parsing command lines.
It's basically an object container for various attributes, with a slightly
specialised representation to make it a little easier to debug the parser.
"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __repr__(self): # pragma: no cover
chunks = []
d = dict(self.__dict__)
kind = d.pop('kind')
for k, v in sorted(d.items()):
chunks.append('%s=%s' % (k, v))
return '%sNode(%s)' % (kind.title(), ' '.join(chunks))
class CommandLineParser(object):
"""
This class implements a fairly unsophisticated recursive descent parser for
shell command lines as used in sh, bash and dash. On Windows, the cmd.exe
command shell has limited compatibility
"""
permitted_tokens = ('&&', '||', '|&', '>>')
def next_token(self):
t = self.lex.get_token()
if not t:
tt = None
else:
tt = self.lex.token_type
if tt in ('"', "'"):
tt = 'word'
t = t[1:-1]
elif tt == 'a':
try:
int(t)
tt = 'number'
except ValueError:
tt = 'word'
elif tt == 'c':
# the shlex parser will return arbitrary runs of 'control'
# characters, but only some runs will be valid for us. We
# split into the valid runs and push all the others back,
# keeping just one. Since all will have a token_type of
# 'c', we don't need to worry about pushing the type back.
if len(t) > 1:
valid = self.get_valid_controls(t)
t = valid.pop(0)
if valid:
for other in reversed(valid):
self.lex.push_token(other)
tt = t
return tt, t, self.lex.preceding
def get_valid_controls(self, t):
if len(t) == 1:
result = [t]
else:
result = []
last = None
for c in t:
if last is not None:
combined = last + c
if combined in self.permitted_tokens:
result.append(combined)
else:
result.append(last)
result.append(c)
last = None
elif c not in ('>', '&', '|'):
result.append(c)
else:
last = c
if last:
result.append(last)
# logger.debug('%s -> %s', t, result)
return result
def peek_token(self):
if self.peek is None:
self.peek = self.next_token()
return self.peek[0]
def consume(self, tt):
self.token = self.peek
self.peek = self.next_token()
if self.token[0] != tt:
raise ValueError('consume: expected %r', tt)
def parse(self, source, posix=None):
self.source = source
parse_logger.debug('starting parse of %r', source)
if posix is None:
posix = os.name == 'posix'
self.lex = shell_shlex(source, posix=posix, control=True)
self.token = None
self.peek = None
self.peek_token()
result = self.parse_list()
return result
def parse_list(self):
parts = [self.parse_pipeline()]
tt = self.peek_token()
while tt in (';', '&'):
self.consume(tt)
part = self.parse_pipeline()
parts.append(Node(kind='sync', sync=tt))
parts.append(part)
tt = self.peek_token()
if len(parts) == 1:
node = parts[0]
else:
node = Node(kind='list', parts=parts)
parse_logger.debug('returning %r', node)
return node
def parse_pipeline(self):
parts = [self.parse_logical()]
tt = self.peek_token()
while tt in ('&&', '||'):
self.consume(tt)
part = self.parse_logical()
parts.append(Node(kind='check', check=tt))
parts.append(part)
tt = self.peek_token()
if len(parts) == 1:
node = parts[0]
else:
node = Node(kind='pipeline', parts=parts)
parse_logger.debug('returning %r', node)
return node
def parse_logical(self):
tt = self.peek_token()
if tt == '(':
self.consume(tt)
node = self.parse_list()
self.consume(')')
else:
parts = [self.parse_command()]
tt = self.peek_token()
while tt in ('|', '|&'):
last_part = parts[-1]
if ((tt == '|' and 1 in last_part.redirects)
or (tt == '|&' and 2 in last_part.redirects)):
if last_part.redirects != SWAP_OUTPUTS:
raise ValueError('semantics: cannot redirect and pipe the '
'same stream')
self.consume(tt)
part = self.parse_command()
parts.append(Node(kind='pipe', pipe=tt))
parts.append(part)
tt = self.peek_token()
if len(parts) == 1:
node = parts[0]
else:
node = Node(kind='logical', parts=parts)
parse_logger.debug('returning %r', node)
return node
def add_redirection(self, node, fd, kind, dest):
if fd in node.redirects:
raise ValueError('semantics: cannot redirect stream %d twice' % fd)
node.redirects[fd] = (kind, dest)
def parse_command(self):
node = self.parse_command_part()
tt = self.peek_token()
while tt in ('word', 'number'):
part = self.parse_command_part()
node.command.extend(part.command)
for fd, v in part.redirects.items():
self.add_redirection(node, fd, v[0], v[1])
tt = self.peek_token()
parse_logger.debug('returning %r', node)
if node.redirects != SWAP_OUTPUTS:
d = dict(node.redirects)
d.pop(1, None)
d.pop(2, None)
if d:
raise ValueError('semantics: can only redirect stdout and '
'stderr, not %s' % list(d.keys()))
if sys.platform == 'win32': # pragma: no cover
from .utils import find_command
cmd = find_command(node.command[0])
if cmd:
exe, cmd = cmd
node.command[0] = cmd
if exe:
node.command.insert(0, exe)
return node
def parse_command_part(self):
node = Node(kind='command', command=[self.peek[1]], redirects={})
if self.peek[0] == 'word':
self.consume('word')
else:
self.consume('number')
tt = self.peek_token()
while tt in ('>', '>>'):
num = 1 # default value
if self.peek[2] == '':
# > or >> seen without preceding whitespace. So see if the
# last token is a positive integer. If it is, assume it's
# an fd to redirect and pop it, else leave it in as part of
# the command line.
try:
try_num = int(node.command[-1])
if try_num > 0:
num = try_num
node.command.pop()
except ValueError:
pass
redirect_kind = tt
self.consume(tt)
tt = self.peek_token()
if tt not in ('word', '&'):
raise ValueError('syntax: expecting filename or &')
if tt == 'word':
redirect_target = self.peek[1]
self.consume(tt)
else:
self.consume('&')
if self.peek_token() != 'number':
raise ValueError('syntax: number expected after &')
n = int(self.peek[1])
redirect_target = ('&', n)
self.consume('number')
self.add_redirection(node, num, redirect_kind, redirect_target)
tt = self.peek_token()
parse_logger.debug('returning %r', node)
return node
[docs]class Pipeline(WithMixin):
"""
This class represents a pipeline of commands.
"""
def __init__(self, source, posix=None, **kwargs):
"""
Initialize a new instance.
Args:
source (str|list|tuple): The command line.
posix (bool): Whether POSIX conventions are used in the lexer.
kwargs (dict): Whatever you might pass to `subprocess.Popen`.
"""
if posix is None:
posix = os.name == 'posix'
is_shell = kwargs.get('shell', False)
if isinstance(source, (list, tuple)) or is_shell:
if is_shell:
self.source = source
else:
self.source = ' '.join(source)
t = Node(kind='command', command=source, redirects={})
else:
self.source = source
t = CommandLineParser().parse(source, posix=posix)
logger.debug('command tree: %s', t)
self.tree = t
self.last = self.find_last_command(t)
self.events = []
self.kwargs = kwargs
self.stdout = kwargs.pop('stdout', None)
self.stderr = kwargs.pop('stderr', None)
self.lock = threading.RLock()
self.commands = []
def find_last_command(self, node):
"""
Find the last command node in a parse sub-tree.
Args:
node (Node): The root of the sub-tree to search.
"""
if not hasattr(node, 'parts'):
result = node
else:
result = self.find_last_command(node.parts[-1])
assert result.kind == 'command'
return result
def run_node_in_thread(self, node, input, async_):
"""
Run a node in a separate thread. A thread is created and
the `run_node()` method is run with the specified arguments
in that thread.
Args:
node (Node): The node to run.
input (str|bytes): The data to pass to the node's command.
async_ (bool): This is passed to `run_node()`.
"""
# When the node is run in a separate thread, we need
# a sync point for when all the commands have been created
# for that node - even when there are delays because of e.g.
# sleep commands or other time-consuming commands. That's
# what these events are for - they're set at the end of
# run_node, and waited on in the pipeline's wait and run
# methods.
e = threading.Event()
with self.lock:
self.events.append(e)
t = threading.Thread(target=self.run_node, args=(node, input, async_, e))
t.daemon = True
logger.debug('thread %s started to run node: %s', t.name, node)
t.start()
[docs] def run(self, input=None, async_=False):
"""
Run the commands in the pipeline.
Args:
input (str|bytes|file): The data to pass to the command.
async_ (bool): If `True`, don't wait for the pipeline to
complete before returning.
"""
self.commands = []
self.opened = []
node = self.tree
# Issue #20: run in thread if async
if async_:
self.run_node_in_thread(node, input, async_=True)
else:
self.run_node(node, input=input, async_=False)
return self
@property
def returncode(self):
"""
The return code of the last command to run, which is regarded as the
overall result of the pipeline.
"""
if self.commands:
return self.commands[-1].process.returncode
@property
def processes(self):
"""
A list of the :class:`subprocess.Popen` instances for all the
commands which have been run.
"""
result = []
if self.commands:
result = [c.process for c in self.commands]
return result
@property
def returncodes(self):
"""
A list of the return codes for all the commands which have been run.
"""
result = []
if self.commands:
for c in self.commands:
rc = None
if c.process:
rc = c.process.poll()
result.append(rc)
return result
@property
def exceptions(self):
"""
A list of any exceptions raised by commands which have been run.
"""
result = [c.exception for c in self.commands if c]
return result
def wait_events(self):
"""
Wait for all the events in the pipeline to be set
"""
for e in self.events:
e.wait()
[docs] def wait(self, timeout=None):
"""
Wait for all the commands in the pipeline to complete.
Args:
timeout (float): The timeout in seconds.
This parameter only applies for
Python >= 3.3 and has no effect otherwise.
It will be applied to each command in turn,
so the effect could be cumulative.
"""
logger.debug('pipeline waiting')
self.wait_events()
for cmd in self.commands:
logger.debug('waiting for command %s', cmd)
cmd.wait(timeout)
[docs] def close(self):
"""
Close the pipeline.
This waits for all the commands in the pipeline to
complete, but also closes all the opened streams once all the commands
have completed.
"""
logger.debug('pipeline closing')
self.wait_events()
for cmd in self.commands:
cmd.wait()
p = cmd.process
if p is None: # pragma: no cover
continue
for attr in ('stdout', 'stderr'):
s = getattr(self, attr)
if isinstance(s, Capture):
s.close()
if p.stdout:
p.stdout.close()
if p.stderr:
p.stderr.close()
for stream in self.opened:
stream.close()
[docs] def poll_last(self):
"""
Check if the last command to run has terminated, and return its exit
code, if available.
"""
if self.commands:
return self.commands[-1].poll()
[docs] def poll_all(self):
"""
Check if all commands to run have terminated. Return a list of exit
codes, where available.
"""
result = []
if self.commands:
result = [c.poll() for c in self.commands]
return result
def run_node(self, node, input, async_, event=None):
"""
This runs a single node in the parse tree.
Args:
node (Node): The node to run.
input (str|bytes|file): The data to pass to the command.
async_ (bool): If `True`, don't wait for the pipeline to complete
before returning.
event (threading.Event): If specified, call `threading.Event.set()`
on the event.
"""
kind = node.kind
method = 'run_%s_node' % kind
logger.debug('%s %s %s', method, async_, event)
try:
result = getattr(self, method)(node, input, async_)
return result
except Exception as e:
logger.exception('Failed: %s', e)
node.exception = e
raise
finally:
if event:
event.set()
def new_command(self, args, **kwargs):
"""
Create a new `Command` from the provided arguments,
and append it to the list of commands.
Args:
args (list[str]): The command and arguments to be created.
"""
cmd = Command(args, **kwargs)
with self.lock:
self.commands.append(cmd)
return cmd
def get_redirects(self, node):
"""
Get the redirects for a node, if any.
Args:
node (Node): An AST node from the parser.
Returns:
tuple: The ``stdout`` and ``stderr`` redirect targets.
If either of these is not specified, the
corresponding value in the result will be ``None``.
"""
stdout = stderr = None
for fd, fs in node.redirects.items():
pos, fn = fs
if pos == '>':
mode = 'wb'
else:
mode = 'ab'
if isinstance(fn, string_types):
# Issue 9: open redirection outputs relative to cwd
if 'cwd' in self.kwargs:
fn = os.path.join(self.kwargs['cwd'], fn)
stream = open(fn, mode)
with self.lock:
self.opened.append(stream)
elif fd == 1:
assert fn == ('&', 2)
stream = STDERR
elif fd == 2:
assert fn == ('&', 1)
stream = subprocess.STDOUT
if fd == 1:
stdout = stream
else:
stderr = stream
return stdout, stderr
def run_logical_node(self, node, input, async_):
"""
This runs a 'logical' node in the parse tree.
Args:
node (Node): The node to run.
input (str|bytes|file): The data to pass to the command.
Text will be encoded using UTF-8.
async_ (bool): If `True`, don't wait for the pipeline to complete
before returning.
"""
logger.debug('started: %s, %s, %s', node, input, async_)
parts = node.parts
last = len(parts) - 1
assert last > 1
prev = pipe = None
i = 0
while i <= last:
curr = parts[i]
if prev is None:
if not input:
stdin = None
else:
stdin = ensure_stream(input)
else:
if pipe == '|':
stdin = prev.process.stdout
else:
stdin = prev.process.stderr
if curr.redirects == SWAP_OUTPUTS:
stdout = STDERR
stderr = subprocess.STDOUT
else:
try:
stdout, stderr = self.get_redirects(curr)
except IOError: # pragma: no cover
if prev and stdin == prev.process.stdout:
stdin.close()
raise
if i < last:
pipe = parts[i + 1].pipe
if pipe == '|':
assert stdout in (None, STDERR)
if stdout is None:
stdout = subprocess.PIPE
else:
assert stderr in (None, subprocess.STDOUT)
if stderr is None:
stderr = subprocess.PIPE
use_async = True
else:
if stdout == STDERR:
assert self.stdout is None
use_async = async_
curr.cmd = self.new_command(curr.command,
stdout=stdout or self.stdout,
stderr=stderr or self.stderr,
**self.kwargs)
curr.cmd.run(input=stdin, async_=use_async)
# Issue 12: close stdin after spawning the child that uses it
if prev and stdin == prev.process.stdout:
stdin.close()
prev = curr.cmd
i += 2
def run_command_node(self, node, input, async_):
"""
This runs a 'command' node in the parse tree.
Args:
node (Node): The node to run.
input (str|bytes|file): The data to pass to the command.
async_ (bool): If `True`, don't wait for the pipeline to
complete before returning.
"""
logger.debug('started: %s, %s, %s', node, input, async_)
kwargs = dict(self.kwargs)
stdout, stderr = self.get_redirects(node)
if node != self.last:
kwargs['stdout'] = stdout or self.stdout
kwargs['stderr'] = stderr or self.stderr
else:
if self.stdout and stdout:
raise ValueError('You cannot redirect one stream to two '
'places')
kwargs['stdout'] = self.stdout or stdout
if self.stderr and stderr:
raise ValueError('You cannot redirect one stream to two '
'places')
kwargs['stderr'] = self.stderr or stderr
node.cmd = self.new_command(node.command, **kwargs)
try:
node.cmd.run(input=input, async_=async_)
except Exception as e:
from .utils import is_main_thread
if is_main_thread():
raise
# if not the main thread, then the exception should have been stored in
# node, so just do nothing more
assert node.cmd.exception is not None
def get_status(self, node):
"""
Get the return code for a node. For a node with multiple commands,
the return code of the last command (as determined by
`find_last_command()` is returned.
Args:
node (Node): The node to query.
Returns:
int|None: The return code for the node.
"""
if node.kind == 'command':
last = node
else:
last = self.find_last_command(node)
return last.cmd.process.returncode
def run_pipeline_node(self, node, input, async_):
"""
This runs a 'pipeline' node in the parse tree.
Args:
node (Node): The node to run.
input (str|bytes|file): The data to pass to the command.
async_(bool): If `True`, don't wait for the pipeline to
complete before returning.
"""
logger.debug('started: %s, %s, %s', node, input, async_)
parts = node.parts
last = len(parts) - 1
assert last > 1
prev = None
i = 0
while i <= last:
curr = parts[i]
if prev is not None:
input = None
elif input is not None:
input = ensure_stream(input)
# run the current command
if i < last:
# need to know status, so run with async_=False
use_async = False
else:
use_async = async_
self.run_node(curr, input, async_=use_async)
if i < last:
check = parts[i + 1].check
if check == '&&':
if self.get_status(curr) != 0:
break
else:
if self.get_status(curr) == 0:
break
prev = curr
i += 2
def run_list_node(self, node, input, async_):
"""
This runs a 'list' node in the parse tree.
Args:
node (Node): The node to run.
input (str|bytes|file): The data to pass to the command.
async_(bool): If `True`, don't wait for the pipeline to
complete before returning.
"""
logger.debug('started: %s, %s, %s', node, input, async_)
parts = node.parts
last = len(parts) - 1
assert last > 1
prev = None
i = 0
while i <= last:
curr = parts[i]
if prev is not None:
input = None
elif input is not None:
input = ensure_stream(input)
if i < last:
use_async = parts[i + 1].sync == '&'
else:
use_async = async_
# run the current command
if not use_async:
self.run_node(curr, input, async_=use_async)
else:
self.run_node_in_thread(curr, input, async_=False)
prev = curr
i += 2
# Module-level convenience functions
[docs]def run(cmd, **kwargs):
"""
Run a command with optional input and either synchronously or
asynchronously.
Apart from the ``input`` and ``async_`` keyword arguments described below,
other keyword arguments are passed to the created :class:`Pipeline`
instance, and thence to :class:`subprocess.Popen` via a :class:`Command`
instance. Note that the ``env`` kwarg is treated differently to how it is
in :class:`~subprocess.Popen`: it is treated as a set of *additional*
environment variables to be added to the values in ``os.environ``.
Args:
cmd (str|list[str]): The command string or array or command/args to be run.
input (str|bytes|file): The input to pass to the command subprocess.
async_ (bool): If `True`, this method returns without waiting for the
subprocess to complete. Otherwise, it awaits completion
by calling the `subprocess.Popen.wait()` method.
"""
input = kwargs.pop('input', None)
async_ = kwargs.pop('async_', False)
if async_:
p = Pipeline(cmd, **kwargs)
p.run(input=input, async_=True)
else:
with Pipeline(cmd, **kwargs) as p:
p.run(input=input, async_=async_)
return p
[docs]def capture_stdout(cmd, **kwargs):
"""
This is the same as `run()`, but the ``stdout`` is captured. You can
access this via the ``stdout`` attribute of the return value from this
function.
Args:
cmd (str|list[str]): The command string or array or command/args to be run.
input (str|bytes|file): The input to pass to the command subprocess.
async_ (bool): If `True`, this method returns without waiting for the
subprocess to complete. Otherwise, it awaits completion
by calling the `subprocess.Popen.wait()` method.
"""
kwargs['stdout'] = Capture()
return run(cmd, **kwargs)
[docs]def capture_stderr(cmd, **kwargs):
"""
This is the same as `run()`, but the ``stderr`` is captured. You can
access this via the ``stderr`` attribute of the return value from this
function.
Args:
cmd (str|list[str]): The command string or array or command/args to be run.
input (str|bytes|file): The input to pass to the command subprocess.
async_ (bool): If `True`, this method returns without waiting for the
subprocess to complete. Otherwise, it awaits completion
by calling the `subprocess.Popen.wait()` method.
"""
kwargs['stderr'] = Capture()
return run(cmd, **kwargs)
[docs]def capture_both(cmd, **kwargs):
"""
This is the same as `run()`, but the ``stdout`` and ``stderr`` are
both captured. You can access these via the ``stdout`` and
``stderr`` attributes of the return value from this function.
Args:
cmd (str|list[str]): The command string or array or command/args to be run.
input (str|bytes|file): The input to pass to the command subprocess.
async_ (bool): If `True`, this method returns without waiting for the
subprocess to complete. Otherwise, it awaits completion
by calling the `subprocess.Popen.wait()` method.
"""
kwargs['stdout'] = Capture()
kwargs['stderr'] = Capture()
return run(cmd, **kwargs)
[docs]def get_stdout(cmd, **kwargs):
"""
This is the same as `capture_stdout()`, but it returns the captured
text. Use this when you know the output will not be voluminous - it will
be buffered in memory.
"""
p = capture_stdout(cmd, **kwargs)
return p.stdout.text
[docs]def get_stderr(cmd, **kwargs):
"""
This is the same as `capture_stderr()`, but it returns the captured
text. Use this when you know the output will not be voluminous - it will
be buffered in memory.
"""
p = capture_stderr(cmd, **kwargs)
return p.stderr.text
[docs]def get_both(cmd, **kwargs):
"""
This is the same as `capture_both()`, but it returns the captured
text from the two streams as a 2-element tuple, with the ``stdout`` text as
the first element and the ``stderr`` text as the second. Use this when you
know the output will not be voluminous - it will be buffered in memory.
"""
p = capture_both(cmd, **kwargs)
return p.stdout.text, p.stderr.text
def parse_command_line(source, posix=None):
"""
Parse a command line into an AST.
Args:
source (str): The command line to parse.
posix (bool): Whether POSIX conventions are used in the lexer.
"""
if posix is None:
posix = os.name == 'posix'
return CommandLineParser().parse(source, posix=posix)