| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| ### extract a snoop log from a snapshot as pcap formatted data |
| |
| import argparse |
| from base64 import b64decode |
| import json |
| import struct |
| import sys |
| from zipfile import BadZipFile, ZipFile |
| |
| # Format described in https://wiki.wireshark.org/Development/LibpcapFileFormat#Global_Header |
| PCAP_HEADER = struct.pack( |
| ">IHHiIII", |
| 0xa1b2c3d4, # Magic number |
| 2, # Major Version |
| 4, # Minor Version |
| 0, # Timezone: GMT |
| 0, # Sigfigs |
| 65535, # Max packet length |
| 201) # Protocol: BLUETOOTH_HCI_H4_WITH_PHDR |
| |
| BUG_MSG = ("Are you sure this is a valid snapshot zipfile? If so, please file a" |
| " bug.") |
| |
| |
| class ConversionError(Exception): |
| |
| def __init__(self, reason): |
| self.reason = reason |
| |
| |
| def exit_with_message(msg, code=1): |
| sys.stderr.write(msg) |
| sys.stderr.flush() |
| sys.exit(code) |
| |
| |
| def convert_device_data(device): |
| """ Take a device dict and convert it into pcap bytes """ |
| b64_prefix = "b64:" |
| sorted_data_pairs = sorted((int(k), v) |
| for k, v in device.items() |
| if isinstance(v, str) and v.startswith(b64_prefix)) |
| return b"".join( |
| b64decode(v[len(b64_prefix):]) for (_, v) in sorted_data_pairs) |
| |
| |
| def find_snoop_root(inspect): |
| """ |
| Return the root of the snoop component's inspect data given inspect data |
| generated by snapshot |
| """ |
| moniker = "bt-snoop.cmx" |
| for component in inspect: |
| if moniker in component.get("path", "") or moniker in component.get( |
| "moniker", ""): |
| return component |
| raise ConversionError( |
| "Input does not contain the inspect data for bt-snoop.cmx.") |
| |
| |
| def get_inspect_from_snapshot(filename): |
| """ Extract and parse inspect json data from a snapshot zip file """ |
| inspect_file = "inspect.json" |
| try: |
| with ZipFile(filename) as snapshot: |
| # find the full path to the inspect file and parse it as json data |
| inspect_path = next(name for name in snapshot.namelist() if name.endswith(inspect_file)) |
| data = snapshot.read(inspect_path) |
| data = json.loads(data) |
| return data |
| except (IOError, BadZipFile) as e: |
| raise ConversionError('Failed to open snapshot zip file "' + filename + |
| '":\n' + str(e)) |
| except StopIteration: |
| raise ConversionError('Could not find "' + inspect_file + |
| "' in snapshot zip file. " + BUG_MSG) |
| except KeyError as e: |
| reason = str(e) + " " + BUG_MSG |
| raise ConversionError(reason) |
| except ValueError: |
| reason = "Failed to parse inspect json data from file '" + inspect_file + "'. " + BUG_MSG |
| raise ConversionError(reason) |
| |
| |
| def main(): |
| """Take a path to a fuchsia snapshot zip file and write binary pcap data to stdout""" |
| description = main.__doc__ |
| example = ("Example: fx bt-snoop-from-snapshot snapshot.zip | wireshark -k -i" |
| " -") |
| parser = argparse.ArgumentParser(description=description, epilog=example) |
| parser.add_argument("path", help="path to snapshot zip file") |
| parser.add_argument( |
| "-o", "--outfile", help="write snoop log to a file instead of stdout") |
| parser.add_argument( |
| "device", |
| nargs="?", |
| default="000", |
| help="name of the hci device file to output") |
| args = parser.parse_args() |
| |
| try: |
| result = get_inspect_from_snapshot(args.path) |
| except ConversionError as e: |
| exit_with_message(e.reason + "\n") |
| |
| try: |
| component_inspect = find_snoop_root(result) |
| if "contents" in component_inspect: |
| metrics = component_inspect["contents"]["root"]["runtime_metrics"] |
| else: |
| metrics = component_inspect["payload"]["root"]["runtime_metrics"] |
| except ConversionError as e: |
| exit_with_message(e.reason + "\n") |
| except (IndexError, KeyError, TypeError): |
| r = ("Inspect data for bt-snoop.cmx component does not conform to the " |
| "expected data format. ") |
| exit_with_message(r + BUG_MSG + "\n") |
| |
| device_key_prefix = "device_" |
| |
| try: |
| device = metrics[device_key_prefix + args.device] |
| except KeyError: |
| devices = [ |
| k[len(device_key_prefix):] |
| for k in metrics |
| if k.startswith(device_key_prefix) |
| ] |
| msg = 'Device not present: "' + args.device + '". Valid device names: ' + ", ".join( |
| devices) |
| exit_with_message(msg + "\n") |
| |
| if args.outfile is not None: |
| with open(args.outfile, "wb") as outfile: |
| outfile.write(PCAP_HEADER) |
| outfile.write(convert_device_data(device)) |
| else: |
| # write pcap binary data to stdout |
| sys.stdout.buffer.write(PCAP_HEADER) |
| sys.stdout.buffer.write(convert_device_data(device)) |
| sys.stdout.buffer.flush() |
| |
| |
| if __name__ == "__main__": |
| main() |