| """Generic wrapper for read-eval-print-loops, a.k.a. interactive shells |
| """ |
| import os.path |
| import signal |
| import sys |
| |
| import pexpect |
| |
| PY3 = (sys.version_info[0] >= 3) |
| |
| if PY3: |
| basestring = str |
| |
| PEXPECT_PROMPT = u'[PEXPECT_PROMPT>' |
| PEXPECT_CONTINUATION_PROMPT = u'[PEXPECT_PROMPT+' |
| |
| class REPLWrapper(object): |
| """Wrapper for a REPL. |
| |
| :param cmd_or_spawn: This can either be an instance of :class:`pexpect.spawn` |
| in which a REPL has already been started, or a str command to start a new |
| REPL process. |
| :param str orig_prompt: The prompt to expect at first. |
| :param str prompt_change: A command to change the prompt to something more |
| unique. If this is ``None``, the prompt will not be changed. This will |
| be formatted with the new and continuation prompts as positional |
| parameters, so you can use ``{}`` style formatting to insert them into |
| the command. |
| :param str new_prompt: The more unique prompt to expect after the change. |
| :param str extra_init_cmd: Commands to do extra initialisation, such as |
| disabling pagers. |
| """ |
| def __init__(self, cmd_or_spawn, orig_prompt, prompt_change, |
| new_prompt=PEXPECT_PROMPT, |
| continuation_prompt=PEXPECT_CONTINUATION_PROMPT, |
| extra_init_cmd=None): |
| if isinstance(cmd_or_spawn, basestring): |
| self.child = pexpect.spawn(cmd_or_spawn, echo=False, encoding='utf-8') |
| else: |
| self.child = cmd_or_spawn |
| if self.child.echo: |
| # Existing spawn instance has echo enabled, disable it |
| # to prevent our input from being repeated to output. |
| self.child.setecho(False) |
| self.child.waitnoecho() |
| |
| if prompt_change is None: |
| self.prompt = orig_prompt |
| else: |
| self.set_prompt(orig_prompt, |
| prompt_change.format(new_prompt, continuation_prompt)) |
| self.prompt = new_prompt |
| self.continuation_prompt = continuation_prompt |
| |
| self._expect_prompt() |
| |
| if extra_init_cmd is not None: |
| self.run_command(extra_init_cmd) |
| |
| def set_prompt(self, orig_prompt, prompt_change): |
| self.child.expect(orig_prompt) |
| self.child.sendline(prompt_change) |
| |
| def _expect_prompt(self, timeout=-1): |
| return self.child.expect_exact([self.prompt, self.continuation_prompt], |
| timeout=timeout) |
| |
| def run_command(self, command, timeout=-1): |
| """Send a command to the REPL, wait for and return output. |
| |
| :param str command: The command to send. Trailing newlines are not needed. |
| This should be a complete block of input that will trigger execution; |
| if a continuation prompt is found after sending input, :exc:`ValueError` |
| will be raised. |
| :param int timeout: How long to wait for the next prompt. -1 means the |
| default from the :class:`pexpect.spawn` object (default 30 seconds). |
| None means to wait indefinitely. |
| """ |
| # Split up multiline commands and feed them in bit-by-bit |
| cmdlines = command.splitlines() |
| # splitlines ignores trailing newlines - add it back in manually |
| if command.endswith('\n'): |
| cmdlines.append('') |
| if not cmdlines: |
| raise ValueError("No command was given") |
| |
| res = [] |
| self.child.sendline(cmdlines[0]) |
| for line in cmdlines[1:]: |
| self._expect_prompt(timeout=timeout) |
| res.append(self.child.before) |
| self.child.sendline(line) |
| |
| # Command was fully submitted, now wait for the next prompt |
| if self._expect_prompt(timeout=timeout) == 1: |
| # We got the continuation prompt - command was incomplete |
| self.child.kill(signal.SIGINT) |
| self._expect_prompt(timeout=1) |
| raise ValueError("Continuation prompt found - input was incomplete:\n" |
| + command) |
| return u''.join(res + [self.child.before]) |
| |
| def python(command="python"): |
| """Start a Python shell and return a :class:`REPLWrapper` object.""" |
| return REPLWrapper(command, u">>> ", u"import sys; sys.ps1={0!r}; sys.ps2={1!r}") |
| |
| def bash(command="bash"): |
| """Start a bash shell and return a :class:`REPLWrapper` object.""" |
| bashrc = os.path.join(os.path.dirname(__file__), 'bashrc.sh') |
| child = pexpect.spawn(command, ['--rcfile', bashrc], echo=False, |
| encoding='utf-8') |
| |
| # If the user runs 'env', the value of PS1 will be in the output. To avoid |
| # replwrap seeing that as the next prompt, we'll embed the marker characters |
| # for invisible characters in the prompt; these show up when inspecting the |
| # environment variable, but not when bash displays the prompt. |
| ps1 = PEXPECT_PROMPT[:5] + u'\\[\\]' + PEXPECT_PROMPT[5:] |
| ps2 = PEXPECT_CONTINUATION_PROMPT[:5] + u'\\[\\]' + PEXPECT_CONTINUATION_PROMPT[5:] |
| prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2) |
| |
| return REPLWrapper(child, u'\\$', prompt_change, |
| extra_init_cmd="export PAGER=cat") |