| import itertools |
| |
| from ShCommands import Command, Pipeline |
| |
| def tcl_preprocess(data): |
| # Tcl has a preprocessing step to replace escaped newlines. |
| i = data.find('\\\n') |
| if i == -1: |
| return data |
| |
| # Replace '\\\n' and subsequent whitespace by a single space. |
| n = len(data) |
| str = data[:i] |
| i += 2 |
| while i < n and data[i] in ' \t': |
| i += 1 |
| return str + ' ' + data[i:] |
| |
| class TclLexer: |
| """TclLexer - Lex a string into "words", following the Tcl syntax.""" |
| |
| def __init__(self, data): |
| self.data = tcl_preprocess(data) |
| self.pos = 0 |
| self.end = len(self.data) |
| |
| def at_end(self): |
| return self.pos == self.end |
| |
| def eat(self): |
| c = self.data[self.pos] |
| self.pos += 1 |
| return c |
| |
| def look(self): |
| return self.data[self.pos] |
| |
| def maybe_eat(self, c): |
| """ |
| maybe_eat(c) - Consume the character c if it is the next character, |
| returning True if a character was consumed. """ |
| if self.data[self.pos] == c: |
| self.pos += 1 |
| return True |
| return False |
| |
| def escape(self, c): |
| if c == 'a': |
| return '\x07' |
| elif c == 'b': |
| return '\x08' |
| elif c == 'f': |
| return '\x0c' |
| elif c == 'n': |
| return '\n' |
| elif c == 'r': |
| return '\r' |
| elif c == 't': |
| return '\t' |
| elif c == 'v': |
| return '\x0b' |
| elif c in 'uxo': |
| raise ValueError,'Invalid quoted character %r' % c |
| else: |
| return c |
| |
| def lex_braced(self): |
| # Lex until whitespace or end of string, the opening brace has already |
| # been consumed. |
| |
| str = '' |
| while 1: |
| if self.at_end(): |
| raise ValueError,"Unterminated '{' quoted word" |
| |
| c = self.eat() |
| if c == '}': |
| break |
| elif c == '{': |
| str += '{' + self.lex_braced() + '}' |
| elif c == '\\' and self.look() in '{}': |
| str += self.eat() |
| else: |
| str += c |
| |
| return str |
| |
| def lex_quoted(self): |
| str = '' |
| |
| while 1: |
| if self.at_end(): |
| raise ValueError,"Unterminated '\"' quoted word" |
| |
| c = self.eat() |
| if c == '"': |
| break |
| elif c == '\\': |
| if self.at_end(): |
| raise ValueError,'Missing quoted character' |
| |
| str += self.escape(self.eat()) |
| else: |
| str += c |
| |
| return str |
| |
| def lex_unquoted(self, process_all=False): |
| # Lex until whitespace or end of string. |
| str = '' |
| while not self.at_end(): |
| if not process_all: |
| if self.look().isspace() or self.look() == ';': |
| break |
| |
| c = self.eat() |
| if c == '\\': |
| if self.at_end(): |
| raise ValueError,'Missing quoted character' |
| |
| str += self.escape(self.eat()) |
| elif c == '[': |
| raise NotImplementedError, ('Command substitution is ' |
| 'not supported') |
| elif c == '$' and not self.at_end() and (self.look().isalpha() or |
| self.look() == '{'): |
| raise NotImplementedError, ('Variable substitution is ' |
| 'not supported') |
| else: |
| str += c |
| |
| return str |
| |
| def lex_one_token(self): |
| if self.maybe_eat('"'): |
| return self.lex_quoted() |
| elif self.maybe_eat('{'): |
| # Check for argument substitution. |
| if not self.maybe_eat('*'): |
| return self.lex_braced() |
| |
| if not self.maybe_eat('}'): |
| return '*' + self.lex_braced() |
| |
| if self.at_end() or self.look().isspace(): |
| return '*' |
| |
| raise NotImplementedError, "Argument substitution is unsupported" |
| else: |
| return self.lex_unquoted() |
| |
| def lex(self): |
| while not self.at_end(): |
| c = self.look() |
| if c in ' \t': |
| self.eat() |
| elif c in ';\n': |
| self.eat() |
| yield (';',) |
| else: |
| yield self.lex_one_token() |
| |
| class TclExecCommand: |
| kRedirectPrefixes1 = ('<', '>') |
| kRedirectPrefixes2 = ('<@', '<<', '2>', '>&', '>>', '>@') |
| kRedirectPrefixes3 = ('2>@', '2>>', '>>&', '>&@') |
| kRedirectPrefixes4 = ('2>@1',) |
| |
| def __init__(self, args): |
| self.args = iter(args) |
| |
| def lex(self): |
| try: |
| return self.args.next() |
| except StopIteration: |
| return None |
| |
| def look(self): |
| next = self.lex() |
| if next is not None: |
| self.args = itertools.chain([next], self.args) |
| return next |
| |
| def parse_redirect(self, tok, length): |
| if len(tok) == length: |
| arg = self.lex() |
| if arg is None: |
| raise ValueError,'Missing argument to %r redirection' % tok |
| else: |
| tok,arg = tok[:length],tok[length:] |
| |
| if tok[0] == '2': |
| op = (tok[1:],2) |
| else: |
| op = (tok,) |
| return (op, arg) |
| |
| def parse_pipeline(self): |
| if self.look() is None: |
| raise ValueError,"Expected at least one argument to exec" |
| |
| commands = [Command([],[])] |
| while 1: |
| arg = self.lex() |
| if arg is None: |
| break |
| elif arg == '|': |
| commands.append(Command([],[])) |
| elif arg == '|&': |
| # Write this as a redirect of stderr; it must come first because |
| # stdout may have already been redirected. |
| commands[-1].redirects.insert(0, (('>&',2),'1')) |
| commands.append(Command([],[])) |
| elif arg[:4] in TclExecCommand.kRedirectPrefixes4: |
| commands[-1].redirects.append(self.parse_redirect(arg, 4)) |
| elif arg[:3] in TclExecCommand.kRedirectPrefixes3: |
| commands[-1].redirects.append(self.parse_redirect(arg, 3)) |
| elif arg[:2] in TclExecCommand.kRedirectPrefixes2: |
| commands[-1].redirects.append(self.parse_redirect(arg, 2)) |
| elif arg[:1] in TclExecCommand.kRedirectPrefixes1: |
| commands[-1].redirects.append(self.parse_redirect(arg, 1)) |
| else: |
| commands[-1].args.append(arg) |
| |
| return Pipeline(commands, False, pipe_err=True) |
| |
| def parse(self): |
| ignoreStderr = False |
| keepNewline = False |
| |
| # Parse arguments. |
| while 1: |
| next = self.look() |
| if not isinstance(next, str) or next[0] != '-': |
| break |
| |
| if next == '--': |
| self.lex() |
| break |
| elif next == '-ignorestderr': |
| ignoreStderr = True |
| elif next == '-keepnewline': |
| keepNewline = True |
| else: |
| raise ValueError,"Invalid exec argument %r" % next |
| |
| return (ignoreStderr, keepNewline, self.parse_pipeline()) |
| |
| ### |
| |
| import unittest |
| |
| class TestTclLexer(unittest.TestCase): |
| def lex(self, str, *args, **kwargs): |
| return list(TclLexer(str, *args, **kwargs).lex()) |
| |
| def test_preprocess(self): |
| self.assertEqual(tcl_preprocess('a b'), 'a b') |
| self.assertEqual(tcl_preprocess('a\\\nb c'), 'a b c') |
| |
| def test_unquoted(self): |
| self.assertEqual(self.lex('a b c'), |
| ['a', 'b', 'c']) |
| self.assertEqual(self.lex(r'a\nb\tc\ '), |
| ['a\nb\tc ']) |
| self.assertEqual(self.lex(r'a \\\$b c $\\'), |
| ['a', r'\$b', 'c', '$\\']) |
| |
| def test_braced(self): |
| self.assertEqual(self.lex('a {b c} {}'), |
| ['a', 'b c', '']) |
| self.assertEqual(self.lex(r'a {b {c\n}}'), |
| ['a', 'b {c\\n}']) |
| self.assertEqual(self.lex(r'a {b\{}'), |
| ['a', 'b{']) |
| self.assertEqual(self.lex(r'{*}'), ['*']) |
| self.assertEqual(self.lex(r'{*} a'), ['*', 'a']) |
| self.assertEqual(self.lex(r'{*} a'), ['*', 'a']) |
| self.assertEqual(self.lex('{a\\\n b}'), |
| ['a b']) |
| |
| def test_quoted(self): |
| self.assertEqual(self.lex('a "b c"'), |
| ['a', 'b c']) |
| |
| def test_terminators(self): |
| self.assertEqual(self.lex('a\nb'), |
| ['a', (';',), 'b']) |
| self.assertEqual(self.lex('a;b'), |
| ['a', (';',), 'b']) |
| self.assertEqual(self.lex('a ; b'), |
| ['a', (';',), 'b']) |
| |
| class TestTclExecCommand(unittest.TestCase): |
| def parse(self, str): |
| return TclExecCommand(list(TclLexer(str).lex())).parse() |
| |
| def test_basic(self): |
| self.assertEqual(self.parse('echo hello'), |
| (False, False, |
| Pipeline([Command(['echo', 'hello'], [])], |
| False, True))) |
| self.assertEqual(self.parse('echo hello | grep hello'), |
| (False, False, |
| Pipeline([Command(['echo', 'hello'], []), |
| Command(['grep', 'hello'], [])], |
| False, True))) |
| |
| def test_redirect(self): |
| self.assertEqual(self.parse('echo hello > a >b >>c 2> d |& e'), |
| (False, False, |
| Pipeline([Command(['echo', 'hello'], |
| [(('>&',2),'1'), |
| (('>',),'a'), |
| (('>',),'b'), |
| (('>>',),'c'), |
| (('>',2),'d')]), |
| Command(['e'], [])], |
| False, True))) |
| |
| if __name__ == '__main__': |
| unittest.main() |