blob: 124f021d3f27534207515b81a1a3860b5819e985 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import threading
import time
from mobly import asserts
from antlion import logger
from antlion.controllers.ap_lib.hostapd_constants import (
CENTER_CHANNEL_MAP,
FREQUENCY_MAP,
VHT_CHANNEL,
)
from antlion.controllers.utils_lib.ssh import connection, formatter, settings
from antlion.libs.proc.process import Process
MOBLY_CONTROLLER_CONFIG_NAME = "PacketCapture"
ACTS_CONTROLLER_REFERENCE_NAME = "packet_capture"
BSS = "BSS"
BSSID = "BSSID"
FREQ = "freq"
FREQUENCY = "frequency"
LEVEL = "level"
MON_2G = "mon0"
MON_5G = "mon1"
BAND_IFACE = {"2G": MON_2G, "5G": MON_5G}
SCAN_IFACE = "wlan2"
SCAN_TIMEOUT = 60
SEP = ":"
SIGNAL = "signal"
SSID = "SSID"
def create(configs):
return [PacketCapture(c) for c in configs]
def destroy(pcaps):
for pcap in pcaps:
pcap.close()
def get_info(pcaps):
return [pcap.ssh_settings.hostname for pcap in pcaps]
class PcapProperties(object):
"""Class to maintain packet capture properties after starting tcpdump.
Attributes:
proc: Process object of tcpdump
pcap_fname: File name of the tcpdump output file
pcap_file: File object for the tcpdump output file
"""
def __init__(self, proc, pcap_fname, pcap_file):
"""Initialize object."""
self.proc = proc
self.pcap_fname = pcap_fname
self.pcap_file = pcap_file
class PacketCaptureError(Exception):
"""Error related to Packet capture."""
class PacketCapture(object):
"""Class representing packet capturer.
An instance of this class creates and configures two interfaces for monitor
mode; 'mon0' for 2G and 'mon1' for 5G and one interface for scanning for
wifi networks; 'wlan2' which is a dual band interface.
Attributes:
pcap_properties: dict that specifies packet capture properties for a
band.
"""
def __init__(self, configs):
"""Initialize objects.
Args:
configs: config for the packet capture.
"""
self.ssh_settings = settings.from_config(configs["ssh_config"])
self.ssh = connection.SshConnection(self.ssh_settings)
self.log = logger.create_logger(
lambda msg: "[%s|%s] %s"
% (MOBLY_CONTROLLER_CONFIG_NAME, self.ssh_settings.hostname, msg)
)
self._create_interface(MON_2G, "monitor")
self._create_interface(MON_5G, "monitor")
self.managed_mode = True
result = self.ssh.run("ifconfig -a", ignore_status=True)
if result.stderr or SCAN_IFACE not in result.stdout:
self.managed_mode = False
if self.managed_mode:
self._create_interface(SCAN_IFACE, "managed")
self.pcap_properties = dict()
self._pcap_stop_lock = threading.Lock()
def _create_interface(self, iface, mode):
"""Create interface of monitor/managed mode.
Create mon0/mon1 for 2G/5G monitor mode and wlan2 for managed mode.
"""
if mode == "monitor":
self.ssh.run(f"ifconfig wlan{iface[-1]} down", ignore_status=True)
self.ssh.run(f"iw dev {iface} del", ignore_status=True)
self.ssh.run(
f"iw phy{iface[-1]} interface add {iface} type {mode}",
ignore_status=True,
)
self.ssh.run(f"ip link set {iface} up", ignore_status=True)
result = self.ssh.run(f"iw dev {iface} info", ignore_status=True)
if result.stderr or iface not in result.stdout:
raise PacketCaptureError(f"Failed to configure interface {iface}")
def _cleanup_interface(self, iface):
"""Clean up monitor mode interfaces."""
self.ssh.run(f"iw dev {iface} del", ignore_status=True)
result = self.ssh.run(f"iw dev {iface} info", ignore_status=True)
if not result.stderr or "No such device" not in result.stderr:
raise PacketCaptureError(f"Failed to cleanup monitor mode for {iface}")
def _parse_scan_results(self, scan_result):
"""Parses the scan dump output and returns list of dictionaries.
Args:
scan_result: scan dump output from scan on mon interface.
Returns:
Dictionary of found network in the scan.
The attributes returned are
a.) SSID - SSID of the network.
b.) LEVEL - signal level.
c.) FREQUENCY - WiFi band the network is on.
d.) BSSID - BSSID of the network.
"""
scan_networks = []
network = {}
for line in scan_result.splitlines():
if SEP not in line:
continue
if BSS in line:
network[BSSID] = line.split("(")[0].split()[-1]
field, value = line.lstrip().rstrip().split(SEP)[0:2]
value = value.lstrip()
if SIGNAL in line:
network[LEVEL] = int(float(value.split()[0]))
elif FREQ in line:
network[FREQUENCY] = int(value)
elif SSID in line:
network[SSID] = value
scan_networks.append(network)
network = {}
return scan_networks
def get_wifi_scan_results(self):
"""Starts a wifi scan on wlan2 interface.
Returns:
List of dictionaries each representing a found network.
"""
if not self.managed_mode:
raise PacketCaptureError("Managed mode not setup")
result = self.ssh.run(f"iw dev {SCAN_IFACE} scan")
if result.stderr:
raise PacketCaptureError("Failed to get scan dump")
if not result.stdout:
return []
return self._parse_scan_results(result.stdout)
def start_scan_and_find_network(self, ssid):
"""Start a wifi scan on wlan2 interface and find network.
Args:
ssid: SSID of the network.
Returns:
True/False if the network if found or not.
"""
curr_time = time.time()
while time.time() < curr_time + SCAN_TIMEOUT:
found_networks = self.get_wifi_scan_results()
for network in found_networks:
if network[SSID] == ssid:
return True
time.sleep(3) # sleep before next scan
return False
def configure_monitor_mode(self, band, channel, bandwidth=20):
"""Configure monitor mode.
Args:
band: band to configure monitor mode for.
channel: channel to set for the interface.
bandwidth : bandwidth for VHT channel as 40,80,160
Returns:
True if configure successful.
False if not successful.
"""
band = band.upper()
if band not in BAND_IFACE:
self.log.error("Invalid band. Must be 2g/2G or 5g/5G")
return False
iface = BAND_IFACE[band]
if bandwidth == 20:
self.ssh.run(f"iw dev {iface} set channel {channel}", ignore_status=True)
else:
center_freq = None
for i, j in CENTER_CHANNEL_MAP[VHT_CHANNEL[bandwidth]]["channels"]:
if channel in range(i, j + 1):
center_freq = (FREQUENCY_MAP[i] + FREQUENCY_MAP[j]) / 2
break
asserts.assert_true(center_freq, "No match channel in VHT channel list.")
self.ssh.run(
"iw dev %s set freq %s %s %s"
% (iface, FREQUENCY_MAP[channel], bandwidth, center_freq),
ignore_status=True,
)
result = self.ssh.run(f"iw dev {iface} info", ignore_status=True)
if result.stderr or f"channel {channel}" not in result.stdout:
self.log.error(f"Failed to configure monitor mode for {band}")
return False
return True
def start_packet_capture(self, band, log_path, pcap_fname):
"""Start packet capture for band.
band = 2G starts tcpdump on 'mon0' interface.
band = 5G starts tcpdump on 'mon1' interface.
Args:
band: '2g' or '2G' and '5g' or '5G'.
log_path: test log path to save the pcap file.
pcap_fname: name of the pcap file.
Returns:
pcap_proc: Process object of the tcpdump.
"""
band = band.upper()
if band not in BAND_IFACE.keys() or band in self.pcap_properties:
self.log.error("Invalid band or packet capture already running")
return None
pcap_name = f"{pcap_fname}_{band}.pcap"
pcap_fname = os.path.join(log_path, pcap_name)
pcap_file = open(pcap_fname, "w+b")
tcpdump_cmd = f"tcpdump -i {BAND_IFACE[band]} -w - -U 2>/dev/null"
cmd = formatter.SshFormatter().format_command(
tcpdump_cmd, None, self.ssh_settings, extra_flags={"-q": None}
)
pcap_proc = Process(cmd)
pcap_proc.set_on_output_callback(lambda msg: pcap_file.write(msg), binary=True)
pcap_proc.start()
self.pcap_properties[band] = PcapProperties(pcap_proc, pcap_fname, pcap_file)
return pcap_proc
def stop_packet_capture(self, proc):
"""Stop the packet capture.
Args:
proc: Process object of tcpdump to kill.
"""
for key, val in self.pcap_properties.items():
if val.proc is proc:
break
else:
self.log.error("Failed to stop tcpdump. Invalid process.")
return
proc.stop()
with self._pcap_stop_lock:
self.pcap_properties[key].pcap_file.close()
del self.pcap_properties[key]
def close(self):
"""Cleanup.
Cleans up all the monitor mode interfaces and closes ssh connections.
"""
self._cleanup_interface(MON_2G)
self._cleanup_interface(MON_5G)
self.ssh.close()