| import os |
| import sys |
| import stat |
| import select |
| import time |
| import errno |
| |
| try: |
| InterruptedError |
| except NameError: |
| # Alias Python2 exception to Python3 |
| InterruptedError = select.error |
| |
| if sys.version_info[0] >= 3: |
| string_types = (str,) |
| else: |
| string_types = (unicode, str) |
| |
| |
| def is_executable_file(path): |
| """Checks that path is an executable regular file, or a symlink towards one. |
| |
| This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``. |
| """ |
| # follow symlinks, |
| fpath = os.path.realpath(path) |
| |
| if not os.path.isfile(fpath): |
| # non-files (directories, fifo, etc.) |
| return False |
| |
| mode = os.stat(fpath).st_mode |
| |
| if (sys.platform.startswith('sunos') |
| and os.getuid() == 0): |
| # When root on Solaris, os.X_OK is True for *all* files, irregardless |
| # of their executability -- instead, any permission bit of any user, |
| # group, or other is fine enough. |
| # |
| # (This may be true for other "Unix98" OS's such as HP-UX and AIX) |
| return bool(mode & (stat.S_IXUSR | |
| stat.S_IXGRP | |
| stat.S_IXOTH)) |
| |
| return os.access(fpath, os.X_OK) |
| |
| |
| def which(filename, env=None): |
| '''This takes a given filename; tries to find it in the environment path; |
| then checks if it is executable. This returns the full path to the filename |
| if found and executable. Otherwise this returns None.''' |
| |
| # Special case where filename contains an explicit path. |
| if os.path.dirname(filename) != '' and is_executable_file(filename): |
| return filename |
| if env is None: |
| env = os.environ |
| p = env.get('PATH') |
| if not p: |
| p = os.defpath |
| pathlist = p.split(os.pathsep) |
| for path in pathlist: |
| ff = os.path.join(path, filename) |
| if is_executable_file(ff): |
| return ff |
| return None |
| |
| |
| def split_command_line(command_line): |
| |
| '''This splits a command line into a list of arguments. It splits arguments |
| on spaces, but handles embedded quotes, doublequotes, and escaped |
| characters. It's impossible to do this with a regular expression, so I |
| wrote a little state machine to parse the command line. ''' |
| |
| arg_list = [] |
| arg = '' |
| |
| # Constants to name the states we can be in. |
| state_basic = 0 |
| state_esc = 1 |
| state_singlequote = 2 |
| state_doublequote = 3 |
| # The state when consuming whitespace between commands. |
| state_whitespace = 4 |
| state = state_basic |
| |
| for c in command_line: |
| if state == state_basic or state == state_whitespace: |
| if c == '\\': |
| # Escape the next character |
| state = state_esc |
| elif c == r"'": |
| # Handle single quote |
| state = state_singlequote |
| elif c == r'"': |
| # Handle double quote |
| state = state_doublequote |
| elif c.isspace(): |
| # Add arg to arg_list if we aren't in the middle of whitespace. |
| if state == state_whitespace: |
| # Do nothing. |
| None |
| else: |
| arg_list.append(arg) |
| arg = '' |
| state = state_whitespace |
| else: |
| arg = arg + c |
| state = state_basic |
| elif state == state_esc: |
| arg = arg + c |
| state = state_basic |
| elif state == state_singlequote: |
| if c == r"'": |
| state = state_basic |
| else: |
| arg = arg + c |
| elif state == state_doublequote: |
| if c == r'"': |
| state = state_basic |
| else: |
| arg = arg + c |
| |
| if arg != '': |
| arg_list.append(arg) |
| return arg_list |
| |
| |
| def select_ignore_interrupts(iwtd, owtd, ewtd, timeout=None): |
| |
| '''This is a wrapper around select.select() that ignores signals. If |
| select.select raises a select.error exception and errno is an EINTR |
| error then it is ignored. Mainly this is used to ignore sigwinch |
| (terminal resize). ''' |
| |
| # if select() is interrupted by a signal (errno==EINTR) then |
| # we loop back and enter the select() again. |
| if timeout is not None: |
| end_time = time.time() + timeout |
| while True: |
| try: |
| return select.select(iwtd, owtd, ewtd, timeout) |
| except InterruptedError: |
| err = sys.exc_info()[1] |
| if err.args[0] == errno.EINTR: |
| # if we loop back we have to subtract the |
| # amount of time we already waited. |
| if timeout is not None: |
| timeout = end_time - time.time() |
| if timeout < 0: |
| return([], [], []) |
| else: |
| # something else caused the select.error, so |
| # this actually is an exception. |
| raise |
| |
| |
| def poll_ignore_interrupts(fds, timeout=None): |
| '''Simple wrapper around poll to register file descriptors and |
| ignore signals.''' |
| |
| if timeout is not None: |
| end_time = time.time() + timeout |
| |
| poller = select.poll() |
| for fd in fds: |
| poller.register(fd, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) |
| |
| while True: |
| try: |
| timeout_ms = None if timeout is None else timeout * 1000 |
| results = poller.poll(timeout_ms) |
| return [afd for afd, _ in results] |
| except InterruptedError: |
| err = sys.exc_info()[1] |
| if err.args[0] == errno.EINTR: |
| # if we loop back we have to subtract the |
| # amount of time we already waited. |
| if timeout is not None: |
| timeout = end_time - time.time() |
| if timeout < 0: |
| return [] |
| else: |
| # something else caused the select.error, so |
| # this actually is an exception. |
| raise |