blob: 65280d29d116dd1a4e7317254f3faf1cd955504c [file] [log] [blame]
#!/usr/bin/python
#
# Low-level QEMU shell on top of QMP.
#
# Copyright (C) 2009, 2010 Red Hat Inc.
#
# Authors:
# Luiz Capitulino <lcapitulino@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2. See
# the COPYING file in the top-level directory.
#
# Usage:
#
# Start QEMU with:
#
# # qemu [...] -qmp unix:./qmp-sock,server
#
# Run the shell:
#
# $ qmp-shell ./qmp-sock
#
# Commands have the following format:
#
# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
#
# For example:
#
# (QEMU) device_add driver=e1000 id=net1
# {u'return': {}}
# (QEMU)
import qmp
import json
import ast
import readline
import sys
import pprint
class QMPCompleter(list):
def complete(self, text, state):
for cmd in self:
if cmd.startswith(text):
if not state:
return cmd
else:
state -= 1
class QMPShellError(Exception):
pass
class QMPShellBadPort(QMPShellError):
pass
class FuzzyJSON(ast.NodeTransformer):
'''This extension of ast.NodeTransformer filters literal "true/false/null"
values in an AST and replaces them by proper "True/False/None" values that
Python can properly evaluate.'''
def visit_Name(self, node):
if node.id == 'true':
node.id = 'True'
if node.id == 'false':
node.id = 'False'
if node.id == 'null':
node.id = 'None'
return node
# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
# _execute_cmd()). Let's design a better one.
class QMPShell(qmp.QEMUMonitorProtocol):
def __init__(self, address, pp=None):
qmp.QEMUMonitorProtocol.__init__(self, self.__get_address(address))
self._greeting = None
self._completer = None
self._pp = pp
self._transmode = False
self._actions = list()
def __get_address(self, arg):
"""
Figure out if the argument is in the port:host form, if it's not it's
probably a file path.
"""
addr = arg.split(':')
if len(addr) == 2:
try:
port = int(addr[1])
except ValueError:
raise QMPShellBadPort
return ( addr[0], port )
# socket path
return arg
def _fill_completion(self):
for cmd in self.cmd('query-commands')['return']:
self._completer.append(cmd['name'])
def __completer_setup(self):
self._completer = QMPCompleter()
self._fill_completion()
readline.set_completer(self._completer.complete)
readline.parse_and_bind("tab: complete")
# XXX: default delimiters conflict with some command names (eg. query-),
# clearing everything as it doesn't seem to matter
readline.set_completer_delims('')
def __parse_value(self, val):
try:
return int(val)
except ValueError:
pass
if val.lower() == 'true':
return True
if val.lower() == 'false':
return False
if val.startswith(('{', '[')):
# Try first as pure JSON:
try:
return json.loads(val)
except ValueError:
pass
# Try once again as FuzzyJSON:
try:
st = ast.parse(val, mode='eval')
return ast.literal_eval(FuzzyJSON().visit(st))
except SyntaxError:
pass
except ValueError:
pass
return val
def __cli_expr(self, tokens, parent):
for arg in tokens:
(key, _, val) = arg.partition('=')
if not val:
raise QMPShellError("Expected a key=value pair, got '%s'" % arg)
value = self.__parse_value(val)
optpath = key.split('.')
curpath = []
for p in optpath[:-1]:
curpath.append(p)
d = parent.get(p, {})
if type(d) is not dict:
raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
parent[p] = d
parent = d
if optpath[-1] in parent:
if type(parent[optpath[-1]]) is dict:
raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
else:
raise QMPShellError('Cannot set "%s" multiple times' % key)
parent[optpath[-1]] = value
def __build_cmd(self, cmdline):
"""
Build a QMP input object from a user provided command-line in the
following format:
< command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
"""
cmdargs = cmdline.split()
# Transactional CLI entry/exit:
if cmdargs[0] == 'transaction(':
self._transmode = True
cmdargs.pop(0)
elif cmdargs[0] == ')' and self._transmode:
self._transmode = False
if len(cmdargs) > 1:
raise QMPShellError("Unexpected input after close of Transaction sub-shell")
qmpcmd = { 'execute': 'transaction',
'arguments': { 'actions': self._actions } }
self._actions = list()
return qmpcmd
# Nothing to process?
if not cmdargs:
return None
# Parse and then cache this Transactional Action
if self._transmode:
finalize = False
action = { 'type': cmdargs[0], 'data': {} }
if cmdargs[-1] == ')':
cmdargs.pop(-1)
finalize = True
self.__cli_expr(cmdargs[1:], action['data'])
self._actions.append(action)
return self.__build_cmd(')') if finalize else None
# Standard command: parse and return it to be executed.
qmpcmd = { 'execute': cmdargs[0], 'arguments': {} }
self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
return qmpcmd
def _print(self, qmp):
jsobj = json.dumps(qmp)
if self._pp is not None:
self._pp.pprint(jsobj)
else:
print str(jsobj)
def _execute_cmd(self, cmdline):
try:
qmpcmd = self.__build_cmd(cmdline)
except Exception, e:
print 'Error while parsing command line: %s' % e
print 'command format: <command-name> ',
print '[arg-name1=arg1] ... [arg-nameN=argN]'
return True
# For transaction mode, we may have just cached the action:
if qmpcmd is None:
return True
if self._verbose:
self._print(qmpcmd)
resp = self.cmd_obj(qmpcmd)
if resp is None:
print 'Disconnected'
return False
self._print(resp)
return True
def connect(self):
self._greeting = qmp.QEMUMonitorProtocol.connect(self)
self.__completer_setup()
def show_banner(self, msg='Welcome to the QMP low-level shell!'):
print msg
version = self._greeting['QMP']['version']['qemu']
print 'Connected to QEMU %d.%d.%d\n' % (version['major'],version['minor'],version['micro'])
def get_prompt(self):
if self._transmode:
return "TRANS> "
return "(QEMU) "
def read_exec_command(self, prompt):
"""
Read and execute a command.
@return True if execution was ok, return False if disconnected.
"""
try:
cmdline = raw_input(prompt)
except EOFError:
print
return False
if cmdline == '':
for ev in self.get_events():
print ev
self.clear_events()
return True
else:
return self._execute_cmd(cmdline)
def set_verbosity(self, verbose):
self._verbose = verbose
class HMPShell(QMPShell):
def __init__(self, address):
QMPShell.__init__(self, address)
self.__cpu_index = 0
def __cmd_completion(self):
for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
if cmd and cmd[0] != '[' and cmd[0] != '\t':
name = cmd.split()[0] # drop help text
if name == 'info':
continue
if name.find('|') != -1:
# Command in the form 'foobar|f' or 'f|foobar', take the
# full name
opt = name.split('|')
if len(opt[0]) == 1:
name = opt[1]
else:
name = opt[0]
self._completer.append(name)
self._completer.append('help ' + name) # help completion
def __info_completion(self):
for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
if cmd:
self._completer.append('info ' + cmd.split()[1])
def __other_completion(self):
# special cases
self._completer.append('help info')
def _fill_completion(self):
self.__cmd_completion()
self.__info_completion()
self.__other_completion()
def __cmd_passthrough(self, cmdline, cpu_index = 0):
return self.cmd_obj({ 'execute': 'human-monitor-command', 'arguments':
{ 'command-line': cmdline,
'cpu-index': cpu_index } })
def _execute_cmd(self, cmdline):
if cmdline.split()[0] == "cpu":
# trap the cpu command, it requires special setting
try:
idx = int(cmdline.split()[1])
if not 'return' in self.__cmd_passthrough('info version', idx):
print 'bad CPU index'
return True
self.__cpu_index = idx
except ValueError:
print 'cpu command takes an integer argument'
return True
resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
if resp is None:
print 'Disconnected'
return False
assert 'return' in resp or 'error' in resp
if 'return' in resp:
# Success
if len(resp['return']) > 0:
print resp['return'],
else:
# Error
print '%s: %s' % (resp['error']['class'], resp['error']['desc'])
return True
def show_banner(self):
QMPShell.show_banner(self, msg='Welcome to the HMP shell!')
def die(msg):
sys.stderr.write('ERROR: %s\n' % msg)
sys.exit(1)
def fail_cmdline(option=None):
if option:
sys.stderr.write('ERROR: bad command-line option \'%s\'\n' % option)
sys.stderr.write('qemu-shell [ -v ] [ -p ] [ -H ] < UNIX socket path> | < TCP address:port >\n')
sys.exit(1)
def main():
addr = ''
qemu = None
hmp = False
pp = None
verbose = False
try:
for arg in sys.argv[1:]:
if arg == "-H":
if qemu is not None:
fail_cmdline(arg)
hmp = True
elif arg == "-p":
if pp is not None:
fail_cmdline(arg)
pp = pprint.PrettyPrinter(indent=4)
elif arg == "-v":
verbose = True
else:
if qemu is not None:
fail_cmdline(arg)
if hmp:
qemu = HMPShell(arg)
else:
qemu = QMPShell(arg, pp)
addr = arg
if qemu is None:
fail_cmdline()
except QMPShellBadPort:
die('bad port number in command-line')
try:
qemu.connect()
except qmp.QMPConnectError:
die('Didn\'t get QMP greeting message')
except qmp.QMPCapabilitiesError:
die('Could not negotiate capabilities')
except qemu.error:
die('Could not connect to %s' % addr)
qemu.show_banner()
qemu.set_verbosity(verbose)
while qemu.read_exec_command(qemu.get_prompt()):
pass
qemu.close()
if __name__ == '__main__':
main()