blob: 441d60bbca8bdd6af20f3f0fc053173b663bab05 [file] [log] [blame]
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Class for Local sniffers - i.e. running on the local machine.
This class provides configuration for local interfaces but leaves
the actual capture (sniff) to sub-classes.
"""
import os
import shutil
import signal
import subprocess
import tempfile
from mobly import logger
from mobly import utils
from mobly.controllers import sniffer
class SnifferLocalBase(sniffer.Sniffer):
"""This class defines the common behaviors of WLAN sniffers running on
WLAN interfaces of the local machine.
Specific mechanisms to capture packets over the local WLAN interfaces are
implemented by sub-classes of this class - i.e. it is not a final class.
"""
def __init__(self, interface, logger, base_configs=None):
"""See base class documentation
"""
self._base_configs = None
self._capture_file_path = ""
self._interface = ""
self._logger = logger
self._process = None
self._temp_capture_file_path = ""
if interface == "":
raise sniffer.InvalidDataError("Empty interface provided")
self._interface = interface
self._base_configs = base_configs
try:
subprocess.check_call(['ifconfig', self._interface, 'down'])
subprocess.check_call(
['iwconfig', self._interface, 'mode', 'monitor'])
subprocess.check_call(['ifconfig', self._interface, 'up'])
except Exception as err:
raise sniffer.ExecutionError(err)
def get_interface(self):
"""See base class documentation
"""
return self._interface
def get_type(self):
"""See base class documentation
"""
return "local"
def get_capture_file(self):
return self._capture_file_path
def _pre_capture_config(self, override_configs=None):
"""Utility function which configures the wireless interface per the
specified configurations. Operation is performed before every capture
start using baseline configurations (specified when sniffer initialized)
and override configurations specified here.
"""
final_configs = {}
if self._base_configs:
final_configs.update(self._base_configs)
if override_configs:
final_configs.update(override_configs)
if sniffer.Sniffer.CONFIG_KEY_CHANNEL in final_configs:
try:
subprocess.check_call([
'iwconfig', self._interface, 'channel',
str(final_configs[sniffer.Sniffer.CONFIG_KEY_CHANNEL])
])
except Exception as err:
raise sniffer.ExecutionError(err)
def _get_command_line(self,
additional_args=None,
duration=None,
packet_count=None):
"""Utility function to be implemented by every child class - which
are the concrete sniffer classes. Each sniffer-specific class should
derive the command line to execute its sniffer based on the specified
arguments.
"""
raise NotImplementedError("Base class should not be called directly!")
def _post_process(self):
"""Utility function which is executed after a capture is done. It
moves the capture file to the requested location.
"""
self._process = None
shutil.move(self._temp_capture_file_path, self._capture_file_path)
def start_capture(self,
override_configs=None,
additional_args=None,
duration=None,
packet_count=None):
"""See base class documentation
"""
if self._process is not None:
raise sniffer.InvalidOperationError(
"Trying to start a sniff while another is still running!")
capture_dir = os.path.join(self._logger.log_path,
"Sniffer-{}".format(self._interface))
os.makedirs(capture_dir, exist_ok=True)
self._capture_file_path = os.path.join(
capture_dir,
"capture_{}.pcap".format(logger.get_log_file_timestamp()))
self._pre_capture_config(override_configs)
_, self._temp_capture_file_path = tempfile.mkstemp(suffix=".pcap")
cmd = self._get_command_line(additional_args=additional_args,
duration=duration,
packet_count=packet_count)
self._process = utils.start_standing_subprocess(cmd)
return sniffer.ActiveCaptureContext(self, duration)
def stop_capture(self):
"""See base class documentation
"""
if self._process is None:
raise sniffer.InvalidOperationError(
"Trying to stop a non-started process")
utils.stop_standing_subprocess(self._process)
self._post_process()
def wait_for_capture(self, timeout=None):
"""See base class documentation
"""
if self._process is None:
raise sniffer.InvalidOperationError(
"Trying to wait on a non-started process")
try:
utils.wait_for_standing_subprocess(self._process, timeout)
self._post_process()
except subprocess.TimeoutExpired:
self.stop_capture()