blob: 516e7f05f52b5c516338f2369772c304595217a5 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
### extract a snoop log from a snapshot as pcap formatted data
import argparse
from base64 import b64decode
import json
import struct
import sys
from zipfile import BadZipFile, ZipFile
# Format described in https://wiki.wireshark.org/Development/LibpcapFileFormat#Global_Header
PCAP_HEADER = struct.pack(
">IHHiIII",
0xa1b2c3d4, # Magic number
2, # Major Version
4, # Minor Version
0, # Timezone: GMT
0, # Sigfigs
65535, # Max packet length
201) # Protocol: BLUETOOTH_HCI_H4_WITH_PHDR
BUG_MSG = ("Are you sure this is a valid snapshot zipfile? If so, please file a"
" bug.")
class ConversionError(Exception):
def __init__(self, reason):
self.reason = reason
def exit_with_message(msg, code=1):
sys.stderr.write(msg)
sys.stderr.flush()
sys.exit(code)
def convert_device_data(device):
""" Take a device dict and convert it into pcap bytes """
b64_prefix = "b64:"
sorted_data_pairs = sorted((int(k), v)
for k, v in device.items()
if isinstance(v, str) and v.startswith(b64_prefix))
return b"".join(
b64decode(v[len(b64_prefix):]) for (_, v) in sorted_data_pairs)
def find_snoop_root(inspect):
"""
Return the root of the snoop component's inspect data given inspect data
generated by snapshot
"""
moniker = "bt-snoop.cmx"
for component in inspect:
if moniker in component.get("path", "") or moniker in component.get(
"moniker", ""):
return component
raise ConversionError(
"Input does not contain the inspect data for bt-snoop.cmx.")
def get_inspect_from_snapshot(filename):
""" Extract and parse inspect json data from a snapshot zip file """
inspect_file = "inspect.json"
try:
with ZipFile(filename) as snapshot:
data = snapshot.read(inspect_file)
data = json.loads(data)
return data
except (IOError, BadZipFile) as e:
raise ConversionError('Failed to open snapshot zip file "' + filename +
'":\n' + str(e))
except KeyError as e:
reason = str(e) + " " + BUG_MSG
raise ConversionError(reason)
except ValueError:
reason = "Failed to parse inspect json data from file '" + inspect_file + "'. " + BUG_MSG
raise ConversionError(reason)
def main():
"""Take a path to a fuchsia snapshot zip file and write binary pcap data to stdout"""
description = main.__doc__
example = ("Example: fx bt-snoop-from-snapshot snapshot.zip | wireshark -k -i"
" -")
parser = argparse.ArgumentParser(description=description, epilog=example)
parser.add_argument("path", help="path to snapshot zip file")
parser.add_argument(
"-o", "--outfile", help="write snoop log to a file instead of stdout")
parser.add_argument(
"device",
nargs="?",
default="000",
help="name of the hci device file to output")
args = parser.parse_args()
try:
result = get_inspect_from_snapshot(args.path)
except ConversionError as e:
exit_with_message(e.reason + "\n")
try:
component_inspect = find_snoop_root(result)
if "contents" in component_inspect:
metrics = component_inspect["contents"]["root"]["runtime_metrics"]
else:
metrics = component_inspect["payload"]["root"]["runtime_metrics"]
except ConversionError as e:
exit_with_message(e.reason + "\n")
except (IndexError, KeyError, TypeError):
r = ("Inspect data for bt-snoop.cmx component does not conform to the "
"expected data format. ")
exit_with_message(r + BUG_MSG + "\n")
device_key_prefix = "device_"
try:
device = metrics[device_key_prefix + args.device]
except KeyError:
devices = [
k[len(device_key_prefix):]
for k in metrics
if k.startswith(device_key_prefix)
]
msg = 'Device not present: "' + args.device + '". Valid device names: ' + ", ".join(
devices)
exit_with_message(msg + "\n")
if args.outfile is not None:
with open(args.outfile, "wb") as outfile:
outfile.write(PCAP_HEADER)
outfile.write(convert_device_data(device))
else:
# write pcap binary data to stdout
sys.stdout.buffer.write(PCAP_HEADER)
sys.stdout.buffer.write(convert_device_data(device))
sys.stdout.buffer.flush()
if __name__ == "__main__":
main()