blob: 8fbe69127dfda046fd728b917c1829dc3c620eab [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# -*- coding: utf-8 -*-
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#### CATEGORY=Run, inspect and debug
### extract a snoop log from a snapshot as pcap formatted data
## Usage: fx bt-snoop-from-snapshot INPUT_SNAPSHOT.zip > OUTPUT.pcap
##
## INPUT_SNAPSHOT is a snapshot from a Fuchsia device, such as obtained with
## `ffx target snapshot`. The pcap output file can be opened with wireshark.
import argparse
from base64 import b64decode
import json
import struct
import sys
from zipfile import BadZipFile, ZipFile
# Format described in https://wiki.wireshark.org/Development/LibpcapFileFormat#Global_Header
PCAP_HEADER = struct.pack(
">IHHiIII",
0xa1b2c3d4, # Magic number
2, # Major Version
4, # Minor Version
0, # Timezone: GMT
0, # Sigfigs
65535, # Max packet length
201) # Protocol: BLUETOOTH_HCI_H4_WITH_PHDR
BUG_MSG = ("Are you sure this is a valid snapshot zipfile? If so, please file a"
" bug.")
class ConversionError(Exception):
def __init__(self, reason):
self.reason = reason
def exit_with_message(msg, code=1):
sys.stderr.write(msg)
sys.stderr.flush()
sys.exit(code)
def convert_device_data(device):
""" Take a device dict and convert it into pcap bytes """
b64_prefix = "b64:"
if "data" in device:
# New inspect format
return b64decode(device["data"][len(b64_prefix):])
else:
# Old inspect format
sorted_data_pairs = sorted((int(k), v) for k, v in device.items()
if isinstance(v, str) and v.startswith(b64_prefix))
decoded = [b64decode(v[len(b64_prefix):]) for (_, v) in sorted_data_pairs]
decoded.insert(0, PCAP_HEADER)
return b"".join(decoded)
def find_snoop_root(inspect):
"""
Return the root of the snoop component's inspect data given inspect data
generated by snapshot
"""
monikers = ["core/bt-snoop"]
for component in inspect:
for moniker in monikers:
if moniker in component.get("path", "") or moniker in component.get(
"moniker", ""):
return component
raise ConversionError(
"Input does not contain the inspect data for bt-snoop.")
def get_inspect_from_snapshot(filename):
""" Extract and parse inspect json data from a snapshot zip file """
inspect_file = "inspect.json"
try:
with ZipFile(filename) as snapshot:
# find the full path to the inspect file and parse it as json data
inspect_path = next(name for name in snapshot.namelist() if name.endswith(inspect_file))
data = snapshot.read(inspect_path)
data = json.loads(data)
return data
except (IOError, BadZipFile) as e:
raise ConversionError('Failed to open snapshot zip file "' + filename +
'":\n' + str(e))
except StopIteration:
raise ConversionError('Could not find "' + inspect_file +
"' in snapshot zip file. " + BUG_MSG)
except KeyError as e:
reason = str(e) + " " + BUG_MSG
raise ConversionError(reason)
except ValueError:
reason = "Failed to parse inspect json data from file '" + inspect_file + "'. " + BUG_MSG
raise ConversionError(reason)
def get_device_from_metrics(metrics, device_name):
"""
Find and return the device data for the device with hci dev name "device_name", accounting
for differences between old and new inspect format
Raises: ConversionError if device_name not present in "metrics".
"""
device_key_prefix = "device_"
new_device_name_key = "hci_device_name"
# determine which inspect format is in use by looking at the metrics data
new_format = False
for key, value in metrics.items():
if key.startswith(device_key_prefix) and new_device_name_key in value:
new_format = True
# device list with device name as key and value unchanged
devices = {}
if new_format:
# New inspect format
for k, v in metrics.items():
if k.startswith(device_key_prefix):
devices[v[new_device_name_key]] = v
else:
# Old inspect format
for k, v in metrics.items():
if k.startswith(device_key_prefix):
key = k[len(device_key_prefix):]
devices[key] = v
try:
return devices[device_name]
except KeyError:
reason = 'Device not present: "' + device_name + '". Valid device names: ' + \
", ".join(devices.keys())
raise ConversionError(reason)
def main():
"""Take a path to a fuchsia snapshot zip file and write binary pcap data to stdout"""
description = main.__doc__
example = ("Example: fx bt-snoop-from-snapshot snapshot.zip | wireshark -k -i"
" -")
parser = argparse.ArgumentParser(description=description, epilog=example)
parser.add_argument("path", help="path to snapshot zip file")
parser.add_argument(
"-o", "--outfile", help="write snoop log to a file instead of stdout")
parser.add_argument(
"device",
nargs="?",
default="000",
help="name of the hci device file to output")
args = parser.parse_args()
try:
result = get_inspect_from_snapshot(args.path)
except ConversionError as e:
exit_with_message(e.reason + "\n")
try:
component_inspect = find_snoop_root(result)
if "contents" in component_inspect:
metrics = component_inspect["contents"]["root"]["runtime_metrics"]
else:
metrics = component_inspect["payload"]["root"]["runtime_metrics"]
except ConversionError as e:
exit_with_message(e.reason + "\n")
except (IndexError, KeyError, TypeError):
r = ("Inspect data for bt-snoop component does not conform to the "
"expected data format. ")
exit_with_message(r + BUG_MSG + "\n")
try:
device = get_device_from_metrics(metrics, args.device)
except ConversionError as e:
exit_with_message(e.reason + "\n")
if args.outfile is not None:
with open(args.outfile, "wb") as outfile:
outfile.write(convert_device_data(device))
else:
# write pcap binary data to stdout
sys.stdout.buffer.write(convert_device_data(device))
sys.stdout.buffer.flush()
if __name__ == "__main__":
main()