blob: dd4b584ac00b616d1ad5fc943652ccbeb4e432fb [file] [log] [blame]
import itertools
import Util
class ShLexer:
def __init__(self, data, win32Escapes = False):
self.data = data
self.pos = 0
self.end = len(data)
self.win32Escapes = win32Escapes
def eat(self):
c = self.data[self.pos]
self.pos += 1
return c
def look(self):
return self.data[self.pos]
def maybe_eat(self, c):
"""
maybe_eat(c) - Consume the character c if it is the next character,
returning True if a character was consumed. """
if self.data[self.pos] == c:
self.pos += 1
return True
return False
def lex_arg_fast(self, c):
# Get the leading whitespace free section.
chunk = self.data[self.pos - 1:].split(None, 1)[0]
# If it has special characters, the fast path failed.
if ('|' in chunk or '&' in chunk or
'<' in chunk or '>' in chunk or
"'" in chunk or '"' in chunk or
'\\' in chunk):
return None
self.pos = self.pos - 1 + len(chunk)
return chunk
def lex_arg_slow(self, c):
if c in "'\"":
str = self.lex_arg_quoted(c)
else:
str = c
while self.pos != self.end:
c = self.look()
if c.isspace() or c in "|&":
break
elif c in '><':
# This is an annoying case; we treat '2>' as a single token so
# we don't have to track whitespace tokens.
# If the parse string isn't an integer, do the usual thing.
if not str.isdigit():
break
# Otherwise, lex the operator and convert to a redirection
# token.
num = int(str)
tok = self.lex_one_token()
assert isinstance(tok, tuple) and len(tok) == 1
return (tok[0], num)
elif c == '"':
self.eat()
str += self.lex_arg_quoted('"')
elif not self.win32Escapes and c == '\\':
# Outside of a string, '\\' escapes everything.
self.eat()
if self.pos == self.end:
Util.warning("escape at end of quoted argument in: %r" %
self.data)
return str
str += self.eat()
else:
str += self.eat()
return str
def lex_arg_quoted(self, delim):
str = ''
while self.pos != self.end:
c = self.eat()
if c == delim:
return str
elif c == '\\' and delim == '"':
# Inside a '"' quoted string, '\\' only escapes the quote
# character and backslash, otherwise it is preserved.
if self.pos == self.end:
Util.warning("escape at end of quoted argument in: %r" %
self.data)
return str
c = self.eat()
if c == '"': #
str += '"'
elif c == '\\':
str += '\\'
else:
str += '\\' + c
else:
str += c
Util.warning("missing quote character in %r" % self.data)
return str
def lex_arg_checked(self, c):
pos = self.pos
res = self.lex_arg_fast(c)
end = self.pos
self.pos = pos
reference = self.lex_arg_slow(c)
if res is not None:
if res != reference:
raise ValueError,"Fast path failure: %r != %r" % (res, reference)
if self.pos != end:
raise ValueError,"Fast path failure: %r != %r" % (self.pos, end)
return reference
def lex_arg(self, c):
return self.lex_arg_fast(c) or self.lex_arg_slow(c)
def lex_one_token(self):
"""
lex_one_token - Lex a single 'sh' token. """
c = self.eat()
if c in ';!':
return (c,)
if c == '|':
if self.maybe_eat('|'):
return ('||',)
return (c,)
if c == '&':
if self.maybe_eat('&'):
return ('&&',)
if self.maybe_eat('>'):
return ('&>',)
return (c,)
if c == '>':
if self.maybe_eat('&'):
return ('>&',)
if self.maybe_eat('>'):
return ('>>',)
return (c,)
if c == '<':
if self.maybe_eat('&'):
return ('<&',)
if self.maybe_eat('>'):
return ('<<',)
return (c,)
return self.lex_arg(c)
def lex(self):
while self.pos != self.end:
if self.look().isspace():
self.eat()
else:
yield self.lex_one_token()
###
class Command:
def __init__(self, args, redirects):
self.args = list(args)
self.redirects = list(redirects)
def __repr__(self):
return 'Command(%r, %r)' % (self.args, self.redirects)
def __cmp__(self, other):
if not isinstance(other, Command):
return -1
return cmp((self.args, self.redirects),
(other.args, other.redirects))
class Pipeline:
def __init__(self, commands, negate):
self.commands = commands
self.negate = negate
def __repr__(self):
return 'Pipeline(%r, %r)' % (self.commands, self.negate)
def __cmp__(self, other):
if not isinstance(other, Pipeline):
return -1
return cmp((self.commands, self.negate),
(other.commands, other.negate))
class Seq:
def __init__(self, lhs, op, rhs):
assert op in (';', '&', '||', '&&')
self.op = op
self.lhs = lhs
self.rhs = rhs
def __repr__(self):
return 'Seq(%r, %r, %r)' % (self.lhs, self.op, self.rhs)
def __cmp__(self, other):
if not isinstance(other, Seq):
return -1
return cmp((self.lhs, self.op, self.rhs),
(other.lhs, other.op, other.rhs))
class ShParser:
def __init__(self, data, win32Escapes = False):
self.data = data
self.tokens = ShLexer(data, win32Escapes = win32Escapes).lex()
def lex(self):
try:
return self.tokens.next()
except StopIteration:
return None
def look(self):
next = self.lex()
if next is not None:
self.tokens = itertools.chain([next], self.tokens)
return next
def parse_command(self):
tok = self.lex()
if not tok:
raise ValueError,"empty command!"
if isinstance(tok, tuple):
raise ValueError,"syntax error near unexpected token %r" % tok[0]
args = [tok]
redirects = []
while 1:
tok = self.look()
# EOF?
if tok is None:
break
# If this is an argument, just add it to the current command.
if isinstance(tok, str):
args.append(self.lex())
continue
# Otherwise see if it is a terminator.
assert isinstance(tok, tuple)
if tok[0] in ('|',';','&','||','&&'):
break
# Otherwise it must be a redirection.
op = self.lex()
arg = self.lex()
if not arg:
raise ValueError,"syntax error near token %r" % op[0]
redirects.append((op, arg))
return Command(args, redirects)
def parse_pipeline(self):
negate = False
if self.look() == ('!',):
self.lex()
negate = True
commands = [self.parse_command()]
while self.look() == ('|',):
self.lex()
commands.append(self.parse_command())
return Pipeline(commands, negate)
def parse(self):
lhs = self.parse_pipeline()
while self.look():
operator = self.lex()
assert isinstance(operator, tuple) and len(operator) == 1
if not self.look():
raise ValueError, "missing argument to operator %r" % operator[0]
# FIXME: Operator precedence!!
lhs = Seq(lhs, operator[0], self.parse_pipeline())
return lhs
###
import unittest
class TestShLexer(unittest.TestCase):
def lex(self, str, *args, **kwargs):
return list(ShLexer(str, *args, **kwargs).lex())
def test_basic(self):
self.assertEqual(self.lex('a|b>c&d<e'),
['a', ('|',), 'b', ('>',), 'c', ('&',), 'd',
('<',), 'e'])
def test_redirection_tokens(self):
self.assertEqual(self.lex('a2>c'),
['a2', ('>',), 'c'])
self.assertEqual(self.lex('a 2>c'),
['a', ('>',2), 'c'])
def test_quoting(self):
self.assertEqual(self.lex(""" 'a' """),
['a'])
self.assertEqual(self.lex(""" "hello\\"world" """),
['hello"world'])
self.assertEqual(self.lex(""" "hello\\'world" """),
["hello\\'world"])
self.assertEqual(self.lex(""" "hello\\\\world" """),
["hello\\world"])
self.assertEqual(self.lex(""" he"llo wo"rld """),
["hello world"])
self.assertEqual(self.lex(""" a\\ b a\\\\b """),
["a b", "a\\b"])
self.assertEqual(self.lex(""" "" "" """),
["", ""])
self.assertEqual(self.lex(""" a\\ b """, win32Escapes = True),
['a\\', 'b'])
class TestShParse(unittest.TestCase):
def parse(self, str):
return ShParser(str).parse()
def test_basic(self):
self.assertEqual(self.parse('echo hello'),
Pipeline([Command(['echo', 'hello'], [])], False))
self.assertEqual(self.parse('echo ""'),
Pipeline([Command(['echo', ''], [])], False))
def test_redirection(self):
self.assertEqual(self.parse('echo hello > c'),
Pipeline([Command(['echo', 'hello'],
[((('>'),), 'c')])], False))
self.assertEqual(self.parse('echo hello > c >> d'),
Pipeline([Command(['echo', 'hello'], [(('>',), 'c'),
(('>>',), 'd')])], False))
def test_pipeline(self):
self.assertEqual(self.parse('a | b'),
Pipeline([Command(['a'], []),
Command(['b'], [])],
False))
self.assertEqual(self.parse('a | b | c'),
Pipeline([Command(['a'], []),
Command(['b'], []),
Command(['c'], [])],
False))
self.assertEqual(self.parse('! a'),
Pipeline([Command(['a'], [])],
True))
def test_list(self):
self.assertEqual(self.parse('a ; b'),
Seq(Pipeline([Command(['a'], [])], False),
';',
Pipeline([Command(['b'], [])], False)))
self.assertEqual(self.parse('a & b'),
Seq(Pipeline([Command(['a'], [])], False),
'&',
Pipeline([Command(['b'], [])], False)))
self.assertEqual(self.parse('a && b'),
Seq(Pipeline([Command(['a'], [])], False),
'&&',
Pipeline([Command(['b'], [])], False)))
self.assertEqual(self.parse('a || b'),
Seq(Pipeline([Command(['a'], [])], False),
'||',
Pipeline([Command(['b'], [])], False)))
self.assertEqual(self.parse('a && b || c'),
Seq(Seq(Pipeline([Command(['a'], [])], False),
'&&',
Pipeline([Command(['b'], [])], False)),
'||',
Pipeline([Command(['c'], [])], False)))
if __name__ == '__main__':
unittest.main()