blob: ddae84e07045fd6273ed64a0e06455eeb89ae048 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#### CATEGORY=Run, inspect and debug
### packet capture and display tool
##
## usage:
## $ fx sniff wlan capture packets over WLAN interface,
## and show their summaries
## $ fx sniff --view hex eth capture packets over Ethernet interface,
## and show their hexadecimal dump
## $ fx sniff --view wireshark wlan capture packets over WLAN interface,
## and start wireshark GUI for display
## $ fx sniff --file myfile eth capture packets and store
## at //out/myfile.pcapng
## $ fx sniff -t 10 wlan capture for 10 sec
## $ fx sniff --help show all command line options
"""Fuchsia packet capture and display tool.
fx sniff captures the packets flowing in-and-out the Fuchsia target device
and displays the packets in a useful view. This is to run on the development
host.
fx sniff will automatically filter out fx-workflow related packets such as ssh,
package server, logs, zxdb, etc., so that the user can focus on the application
of interest. For those who need to debug the fx workflow itself,
the full command under use is also printed; one can easily modify to meet
their own needs.
[Typical usages]
$ fx sniff wlan # capture packets over WLAN interface,
# and show their summaries
$ fx sniff --view hex eth # capture packets over Ethernet interface,
# and show their hexadecimal dump
$ fx sniff --view wireshark wlan # capture packets over WLAN interface,
# and start wireshark GUI for display
$ fx sniff --file myfile eth # capture packets and store
# at //out/myfile.pcapng
$ fx sniff -t 10 wlan # capture for 10 sec
$ fx sniff --help # show all command line options
"""
import argparse
import fcntl
import os
import subprocess
import sys
import termios
import time
LINUX_BIN_WIRESHARK = u"wireshark"
LINUX_BIN_TSHARK = u"tshark"
TARGET_TMP_DIR = u"/tmp/"
def has_cmd(binary_name):
return any(
os.access(os.path.join(path, binary_name), os.X_OK)
for path in os.environ["PATH"].split(os.pathsep))
def has_wireshark_env():
"""Test if wireshark GUI can run.
Returns:
(True, "") if wireshark can run in the environment.
(False, Error_String) otherwise.
"""
platform = os.uname()
print(platform)
if len(platform) is 0:
return False, u"Failed to get uname"
if platform[0].lower() != u"linux":
return False, u"Supported only on Linux"
if not has_cmd(LINUX_BIN_WIRESHARK):
return False, u"can\'t find %s" % (LINUX_BIN_WIRESHARK)
# All look good.
return True, u""
def run_cmd(cmd):
"""Run subprocess.run() safely and returns the stdout.
Args:
cmd: a command line string.
Returns:
The stdout outcome of the shell command.
"""
result = subprocess.run(
cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return result.stdout.decode()
def can_run_cmd(cmd):
"""Test if the environment can run cmd.
Args:
cmd: a command line string.
Returns:
True if the command can run without error catched as an Exception.
False otherwise.
"""
try:
child = subprocess.check_call(cmd.split(), stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
except Exception:
return False
return True
def invoke_shell(cmd):
"""invoke_shell() uses shell=True to support the command
that includes shell escape sequences.
Args:
cmd: command string, which may include escape sequence.
"""
print(u"Invoke shell:" + cmd)
p = subprocess.Popen(cmd, shell=True)
p.wait()
def get_interface_names():
"""Get all network interface names of the target except for the loopback.
Returns:
A list of interface names.
"""
result = run_cmd(u"fx shell net if list")
names = []
for l in result.split(u"\n"):
if u"name" not in l:
# Look for name field only.
continue
parsed = l.split()
if len(parsed) != 2:
continue
name = parsed[1].strip()
if name == u"lo":
# sniffing on loopback interface is not supported
continue
names.append(name)
return names
def get_interface_filepath(interface_name):
"""Get the filepaths for all the interfaces on the target.
Args:
interface_name: interface name.
Returns:
A list of the interface file paths.
"""
result = run_cmd(u"fx shell net if list %s" % interface_name)
filepaths = []
for l in result.split(u"\n"):
if u"filepath" not in l:
continue
parsed = l.split()
if len(parsed) != 2:
continue
filepath = parsed[1].strip()
if filepath.lower() == u"[none]":
# sniffing on loopback interface is not supported
continue
filepaths.append(filepath)
return filepaths
def has_fuzzy_name(name_under_test, names):
if not name_under_test:
return False
for n in names:
if name_under_test in n:
return True
return False
def build_cmd(args):
"""Build cmd line for sniffing and displaying.
Args:
args: command line arguments.
Returns:
cmd string.
"""
fx_workflow_filter = (
u"not ( "
u"port ssh or dst port 8083 or dst port 2345 or port 1900 "
u"or ip6 dst port 33330-33341 or ip6 dst port 33337-33338"
u" )")
# Pay special attention to the escape sequence
# This command goes through the host shell, and ssh shell.
cmd_prefix = u"fx shell sh -c '\"netdump -t %s -f \\\"%s\\\" " % (
args.timeout, fx_workflow_filter)
cmd_suffix = u" %s\"'" % (args.interface_filepath)
cmd_options = u""
# Build more options
if args.file:
full_path = u"%s%s" % (TARGET_TMP_DIR, args.file)
cmd_options += u"-w %s " % full_path
if args.view == u"summary":
cmd_options += u"" # Default behavior
elif args.view == u"hex":
cmd_options += u"--hexdump"
elif args.view == u"wireshark":
(result, err_str) = has_wireshark_env()
if not result:
msg = (
u"Does not have a working wireshark envirionment. "
u"Note it requires graphical environment "
u"such as X Display: %s" % err_str)
print(msg)
return
cmd_options += u"--pcapdump"
cmd_suffix += u" | wireshark -k -i -"
cmd = cmd_prefix + cmd_options + cmd_suffix
return cmd
def get_keystroke_unblocking():
"""Returns a keystroke in a non-blocking way.
"""
fd = sys.stdin.fileno()
attr_org = termios.tcgetattr(fd)
flags_org = fcntl.fcntl(fd, fcntl.F_GETFL)
attr_new = attr_org[::]
attr_new[3] = attr_new[3] & ~termios.ICANON & ~termios.ECHO
flags_new = flags_org | os.O_NONBLOCK
try:
termios.tcsetattr(fd, termios.TCSANOW, attr_new)
fcntl.fcntl(fd, fcntl.F_SETFL, flags_new)
key = sys.stdin.read(1)
except:
pass
finally:
termios.tcsetattr(fd, termios.TCSAFLUSH, attr_org)
fcntl.fcntl(fd, fcntl.F_SETFL, flags_org)
return key
def do_sniff(cmd):
"""Run user-interruptible sniffer.
Args:
cmd: command string, which may include escape sequence.
"""
print("Run: {}".format(cmd))
p = subprocess.Popen(cmd, shell=True)
while p.poll() is None:
time.sleep(0.07) # To tame the CPU cycle consumption
user_key = get_keystroke_unblocking()
if user_key in ["q", "c", "Q", "C"]:
print(" ... forced stop by user ({})".format(user_key))
run_cmd("fx shell killall netdump")
p.terminate()
def move_out_dumpfile(filename):
"""Move the PCAPNG dump file from the target device to the host device.
Args:
filename: filename stored in the target. Empty string if no file was stored.
"""
if not filename:
return
full_path = u"%s%s" % (TARGET_TMP_DIR, filename)
cmd = u"cd %s" % os.environ[u"FUCHSIA_OUT_DIR"]
cmd += u"; fx scp \"[$(fx get-device-addr)]:%s\" ." % full_path
cmd += u"; fx shell rm -rf %s" % full_path
invoke_shell(cmd)
def is_target_ready():
"""Tests if the target Fuchsia device is ready to capture packets.
Returns:
True if the target is ready. False otherwise.
"""
if not can_run_cmd("fx shell exit"):
print("failed to run: the target device unreachable by 'fx shell'")
return False
if not can_run_cmd("fx shell which netdump"):
msg = ("failed to run: the target does not have 'netdump'. "
"Build with '--with-base //src/connectivity/network/netdump' "
"and reload the tarnet")
print(msg)
return False
return True
def main():
if not is_target_ready():
sys.exit(1)
iface_names = get_interface_names()
parser = argparse.ArgumentParser(
description=u"Capture packets on the target, Display on the host")
parser.add_argument(
u"interface_name",
nargs=u"?",
default=u"",
help=u"Choose one interface name from: %s" %
(" ".join(i for i in iface_names)))
parser.add_argument(
u"-t", u"--timeout", default=30, help=u"Time duration to sniff")
parser.add_argument(
u"--view",
nargs=u"?",
choices=[u"wireshark", u"hex", u"summary"],
default=u"summary",
const=u"summary",
help=u"Wireshark requires X Display GUI environment.")
parser.add_argument(
u"--file",
type=str,
default=u"",
help=
u"Store PCAPNG file in //out directory. May use with --view option")
parser.add_argument(
u"--iface_filepath",
dest=u"iface_filepath",
default=u"",
help=u"Specify the interface filepath directly. For advanced users.")
args = parser.parse_args()
# Sanitize the file name
if args.file:
if not args.file.endswith(u".pcapng"):
args.file = args.file + ".pcapng"
if not has_fuzzy_name(args.interface_name, iface_names):
print(
u"Choose one interface name from: %s" %
" ".join(i for i in iface_names))
sys.exit(1)
iface_file_paths = get_interface_filepath(args.interface_name)
if len(iface_file_paths) != 1:
msg = u"Querying interface name |%s| yielded non-unique result: %s" % (
args.interface_name, u" ".join(f for f in iface_file_paths))
print(msg)
sys.exit(1)
args.interface_filepath = iface_file_paths[0]
do_sniff(build_cmd(args))
print(u"\nEnd of fx sniff")
move_out_dumpfile(args.file)
sys.exit(0)
if __name__ == u"__main__":
sys.exit(main())