| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import collections |
| import logging |
| import math |
| import os |
| import re |
| import shutil |
| import socket |
| import time |
| from datetime import datetime |
| |
| from antlion import context |
| from antlion import logger as acts_logger |
| from antlion import tracelogger, utils |
| from antlion.controllers import adb, fastboot |
| from antlion.controllers.adb_lib.error import AdbError |
| from antlion.controllers.android_lib import errors |
| from antlion.controllers.android_lib import events as android_events |
| from antlion.controllers.android_lib import logcat, services |
| from antlion.controllers.sl4a_lib import sl4a_manager |
| from antlion.controllers.utils_lib.ssh import connection, settings |
| from antlion.event import event_bus |
| from antlion.libs.proc import job |
| |
| MOBLY_CONTROLLER_CONFIG_NAME = "AndroidDevice" |
| ACTS_CONTROLLER_REFERENCE_NAME = "android_devices" |
| |
| ANDROID_DEVICE_PICK_ALL_TOKEN = "*" |
| # Key name for SL4A extra params in config file |
| ANDROID_DEVICE_SL4A_CLIENT_PORT_KEY = "sl4a_client_port" |
| ANDROID_DEVICE_SL4A_FORWARDED_PORT_KEY = "sl4a_forwarded_port" |
| ANDROID_DEVICE_SL4A_SERVER_PORT_KEY = "sl4a_server_port" |
| # Key name for adb logcat extra params in config file. |
| ANDROID_DEVICE_ADB_LOGCAT_PARAM_KEY = "adb_logcat_param" |
| ANDROID_DEVICE_EMPTY_CONFIG_MSG = "Configuration is empty, abort!" |
| ANDROID_DEVICE_NOT_LIST_CONFIG_MSG = "Configuration should be a list, abort!" |
| CRASH_REPORT_PATHS = ( |
| "/data/tombstones/", |
| "/data/vendor/ramdump/", |
| "/data/ramdump/", |
| "/data/vendor/ssrdump", |
| "/data/vendor/ramdump/bluetooth", |
| "/data/vendor/log/cbd", |
| ) |
| CRASH_REPORT_SKIPS = ( |
| "RAMDUMP_RESERVED", |
| "RAMDUMP_STATUS", |
| "RAMDUMP_OUTPUT", |
| "bluetooth", |
| ) |
| ALWAYS_ON_LOG_PATH = "/data/vendor/radio/logs/always-on" |
| DEFAULT_QXDM_LOG_PATH = "/data/vendor/radio/diag_logs" |
| DEFAULT_SDM_LOG_PATH = "/data/vendor/slog/" |
| DEFAULT_SCREENSHOT_PATH = "/sdcard/Pictures/screencap" |
| BUG_REPORT_TIMEOUT = 1800 |
| PULL_TIMEOUT = 300 |
| PORT_RETRY_COUNT = 3 |
| ADB_ROOT_RETRY_COUNT = 2 |
| ADB_ROOT_RETRY_INTERVAL = 10 |
| IPERF_TIMEOUT = 60 |
| SL4A_APK_NAME = "com.googlecode.android_scripting" |
| WAIT_FOR_DEVICE_TIMEOUT = 180 |
| ENCRYPTION_WINDOW = "CryptKeeper" |
| DEFAULT_DEVICE_PASSWORD = "1111" |
| RELEASE_ID_REGEXES = [re.compile(r"\w+\.\d+\.\d+"), re.compile(r"N\w+")] |
| |
| |
| def create(configs): |
| """Creates AndroidDevice controller objects. |
| |
| Args: |
| configs: A list of dicts, each representing a configuration for an |
| Android device. |
| |
| Returns: |
| A list of AndroidDevice objects. |
| """ |
| if not configs: |
| raise errors.AndroidDeviceConfigError(ANDROID_DEVICE_EMPTY_CONFIG_MSG) |
| elif configs == ANDROID_DEVICE_PICK_ALL_TOKEN: |
| ads = get_all_instances() |
| elif not isinstance(configs, list): |
| raise errors.AndroidDeviceConfigError(ANDROID_DEVICE_NOT_LIST_CONFIG_MSG) |
| elif isinstance(configs[0], str): |
| # Configs is a list of serials. |
| ads = get_instances(configs) |
| else: |
| # Configs is a list of dicts. |
| ads = get_instances_with_configs(configs) |
| |
| ads[0].log.info(f'The primary device under test is "{ads[0].serial}".') |
| |
| for ad in ads: |
| if not ad.is_connected(): |
| raise errors.AndroidDeviceError( |
| ("Android device %s is specified in config" " but is not attached.") |
| % ad.serial, |
| serial=ad.serial, |
| ) |
| _start_services_on_ads(ads) |
| for ad in ads: |
| if ad.droid: |
| utils.set_location_service(ad, False) |
| utils.sync_device_time(ad) |
| return ads |
| |
| |
| def destroy(ads): |
| """Cleans up AndroidDevice objects. |
| |
| Args: |
| ads: A list of AndroidDevice objects. |
| """ |
| for ad in ads: |
| try: |
| ad.clean_up() |
| except: |
| ad.log.exception("Failed to clean up properly.") |
| |
| |
| def get_info(ads): |
| """Get information on a list of AndroidDevice objects. |
| |
| Args: |
| ads: A list of AndroidDevice objects. |
| |
| Returns: |
| A list of dict, each representing info for an AndroidDevice objects. |
| """ |
| device_info = [] |
| for ad in ads: |
| info = {"serial": ad.serial, "model": ad.model} |
| info.update(ad.build_info) |
| device_info.append(info) |
| return device_info |
| |
| |
| def _start_services_on_ads(ads): |
| """Starts long running services on multiple AndroidDevice objects. |
| |
| If any one AndroidDevice object fails to start services, cleans up all |
| existing AndroidDevice objects and their services. |
| |
| Args: |
| ads: A list of AndroidDevice objects whose services to start. |
| """ |
| running_ads = [] |
| for ad in ads: |
| running_ads.append(ad) |
| try: |
| ad.start_services() |
| except: |
| ad.log.exception("Failed to start some services, abort!") |
| destroy(running_ads) |
| raise |
| |
| |
| def _parse_device_list(device_list_str, key): |
| """Parses a byte string representing a list of devices. The string is |
| generated by calling either adb or fastboot. |
| |
| Args: |
| device_list_str: Output of adb or fastboot. |
| key: The token that signifies a device in device_list_str. |
| |
| Returns: |
| A list of android device serial numbers. |
| """ |
| return re.findall(r"(\S+)\t%s" % key, device_list_str) |
| |
| |
| def list_adb_devices(): |
| """List all android devices connected to the computer that are detected by |
| adb. |
| |
| Returns: |
| A list of android device serials. Empty if there's none. |
| """ |
| out = adb.AdbProxy().devices() |
| return _parse_device_list(out, "device") |
| |
| |
| def list_fastboot_devices(): |
| """List all android devices connected to the computer that are in in |
| fastboot mode. These are detected by fastboot. |
| |
| Returns: |
| A list of android device serials. Empty if there's none. |
| """ |
| out = fastboot.FastbootProxy().devices() |
| return _parse_device_list(out, "fastboot") |
| |
| |
| def get_instances(serials): |
| """Create AndroidDevice instances from a list of serials. |
| |
| Args: |
| serials: A list of android device serials. |
| |
| Returns: |
| A list of AndroidDevice objects. |
| """ |
| results = [] |
| for s in serials: |
| results.append(AndroidDevice(s)) |
| return results |
| |
| |
| def get_instances_with_configs(configs): |
| """Create AndroidDevice instances from a list of json configs. |
| |
| Each config should have the required key-value pair "serial". |
| |
| Args: |
| configs: A list of dicts each representing the configuration of one |
| android device. |
| |
| Returns: |
| A list of AndroidDevice objects. |
| """ |
| results = [] |
| for c in configs: |
| try: |
| serial = c.pop("serial") |
| except KeyError: |
| raise errors.AndroidDeviceConfigError( |
| f"Required value 'serial' is missing in AndroidDevice config {c}." |
| ) |
| client_port = 0 |
| if ANDROID_DEVICE_SL4A_CLIENT_PORT_KEY in c: |
| try: |
| client_port = int(c.pop(ANDROID_DEVICE_SL4A_CLIENT_PORT_KEY)) |
| except ValueError: |
| raise errors.AndroidDeviceConfigError( |
| "'%s' is not a valid number for config %s" |
| % (ANDROID_DEVICE_SL4A_CLIENT_PORT_KEY, c) |
| ) |
| server_port = None |
| if ANDROID_DEVICE_SL4A_SERVER_PORT_KEY in c: |
| try: |
| server_port = int(c.pop(ANDROID_DEVICE_SL4A_SERVER_PORT_KEY)) |
| except ValueError: |
| raise errors.AndroidDeviceConfigError( |
| "'%s' is not a valid number for config %s" |
| % (ANDROID_DEVICE_SL4A_SERVER_PORT_KEY, c) |
| ) |
| forwarded_port = 0 |
| if ANDROID_DEVICE_SL4A_FORWARDED_PORT_KEY in c: |
| try: |
| forwarded_port = int(c.pop(ANDROID_DEVICE_SL4A_FORWARDED_PORT_KEY)) |
| except ValueError: |
| raise errors.AndroidDeviceConfigError( |
| "'%s' is not a valid number for config %s" |
| % (ANDROID_DEVICE_SL4A_FORWARDED_PORT_KEY, c) |
| ) |
| ssh_config = c.pop("ssh_config", None) |
| ssh_connection = None |
| if ssh_config is not None: |
| ssh_settings = settings.from_config(ssh_config) |
| ssh_connection = connection.SshConnection(ssh_settings) |
| ad = AndroidDevice( |
| serial, |
| ssh_connection=ssh_connection, |
| client_port=client_port, |
| forwarded_port=forwarded_port, |
| server_port=server_port, |
| ) |
| ad.load_config(c) |
| results.append(ad) |
| return results |
| |
| |
| def get_all_instances(include_fastboot=False): |
| """Create AndroidDevice instances for all attached android devices. |
| |
| Args: |
| include_fastboot: Whether to include devices in bootloader mode or not. |
| |
| Returns: |
| A list of AndroidDevice objects each representing an android device |
| attached to the computer. |
| """ |
| if include_fastboot: |
| serial_list = list_adb_devices() + list_fastboot_devices() |
| return get_instances(serial_list) |
| return get_instances(list_adb_devices()) |
| |
| |
| def filter_devices(ads, func): |
| """Finds the AndroidDevice instances from a list that match certain |
| conditions. |
| |
| Args: |
| ads: A list of AndroidDevice instances. |
| func: A function that takes an AndroidDevice object and returns True |
| if the device satisfies the filter condition. |
| |
| Returns: |
| A list of AndroidDevice instances that satisfy the filter condition. |
| """ |
| results = [] |
| for ad in ads: |
| if func(ad): |
| results.append(ad) |
| return results |
| |
| |
| def get_device(ads, **kwargs): |
| """Finds a unique AndroidDevice instance from a list that has specific |
| attributes of certain values. |
| |
| Example: |
| get_device(android_devices, label="foo", phone_number="1234567890") |
| get_device(android_devices, model="angler") |
| |
| Args: |
| ads: A list of AndroidDevice instances. |
| kwargs: keyword arguments used to filter AndroidDevice instances. |
| |
| Returns: |
| The target AndroidDevice instance. |
| |
| Raises: |
| AndroidDeviceError is raised if none or more than one device is |
| matched. |
| """ |
| |
| def _get_device_filter(ad): |
| for k, v in kwargs.items(): |
| if not hasattr(ad, k): |
| return False |
| elif getattr(ad, k) != v: |
| return False |
| return True |
| |
| filtered = filter_devices(ads, _get_device_filter) |
| if not filtered: |
| raise ValueError( |
| f"Could not find a target device that matches condition: {kwargs}." |
| ) |
| elif len(filtered) == 1: |
| return filtered[0] |
| else: |
| serials = [ad.serial for ad in filtered] |
| raise ValueError(f"More than one device matched: {serials}") |
| |
| |
| def take_bug_reports(ads, test_name, begin_time): |
| """Takes bug reports on a list of android devices. |
| |
| If you want to take a bug report, call this function with a list of |
| android_device objects in on_fail. But reports will be taken on all the |
| devices in the list concurrently. Bug report takes a relative long |
| time to take, so use this cautiously. |
| |
| Args: |
| ads: A list of AndroidDevice instances. |
| test_name: Name of the test case that triggered this bug report. |
| begin_time: Logline format timestamp taken when the test started. |
| """ |
| |
| def take_br(test_name, begin_time, ad): |
| ad.take_bug_report(test_name, begin_time) |
| |
| args = [(test_name, begin_time, ad) for ad in ads] |
| utils.concurrent_exec(take_br, args) |
| |
| |
| class AndroidDevice: |
| """Class representing an android device. |
| |
| Each object of this class represents one Android device in ACTS, including |
| handles to adb, fastboot, and sl4a clients. In addition to direct adb |
| commands, this object also uses adb port forwarding to talk to the Android |
| device. |
| |
| Attributes: |
| serial: A string that's the serial number of the Android device. |
| log_path: A string that is the path where all logs collected on this |
| android device should be stored. |
| log: A logger adapted from root logger with added token specific to an |
| AndroidDevice instance. |
| adb_logcat_process: A process that collects the adb logcat. |
| adb: An AdbProxy object used for interacting with the device via adb. |
| fastboot: A FastbootProxy object used for interacting with the device |
| via fastboot. |
| client_port: Preferred client port number on the PC host side for SL4A |
| forwarded_port: Preferred server port number forwarded from Android |
| to the host PC via adb for SL4A connections |
| server_port: Preferred server port used by SL4A on Android device |
| |
| """ |
| |
| def __init__( |
| self, |
| serial="", |
| ssh_connection=None, |
| client_port=0, |
| forwarded_port=0, |
| server_port=None, |
| ): |
| self.serial = serial |
| # logging.log_path only exists when this is used in an ACTS test run. |
| log_path_base = getattr(logging, "log_path", "/tmp/logs") |
| self.log_dir = f"AndroidDevice{serial}" |
| self.log_path = os.path.join(log_path_base, self.log_dir) |
| self.client_port = client_port |
| self.forwarded_port = forwarded_port |
| self.server_port = server_port |
| self.log = tracelogger.TraceLogger( |
| AndroidDeviceLoggerAdapter(logging.getLogger(), {"serial": serial}) |
| ) |
| self._event_dispatchers = {} |
| self._services = [] |
| self.register_service(services.AdbLogcatService(self)) |
| self.register_service(services.Sl4aService(self)) |
| self.adb_logcat_process = None |
| self.adb = adb.AdbProxy(serial, ssh_connection=ssh_connection) |
| self.fastboot = fastboot.FastbootProxy(serial, ssh_connection=ssh_connection) |
| if not self.is_bootloader: |
| self.root_adb() |
| self._ssh_connection = ssh_connection |
| self.skip_sl4a = False |
| self.crash_report = None |
| self.data_accounting = collections.defaultdict(int) |
| self._sl4a_manager = sl4a_manager.create_sl4a_manager(self.adb) |
| self.last_logcat_timestamp = None |
| # Device info cache. |
| self._user_added_device_info = {} |
| self._sdk_api_level = None |
| |
| def clean_up(self): |
| """Cleans up the AndroidDevice object and releases any resources it |
| claimed. |
| """ |
| self.stop_services() |
| for service in self._services: |
| service.unregister() |
| self._services.clear() |
| if self._ssh_connection: |
| self._ssh_connection.close() |
| |
| def recreate_services(self, serial): |
| """Clean up the AndroidDevice object and re-create adb/sl4a services. |
| |
| Unregister the existing services and re-create adb and sl4a services, |
| call this method when the connection break after certain API call |
| (e.g., enable USB tethering by #startTethering) |
| |
| Args: |
| serial: the serial number of the AndroidDevice |
| """ |
| # Clean the old services |
| for service in self._services: |
| service.unregister() |
| self._services.clear() |
| if self._ssh_connection: |
| self._ssh_connection.close() |
| self._sl4a_manager.stop_service() |
| |
| # Wait for old services to stop |
| time.sleep(5) |
| |
| # Re-create the new adb and sl4a services |
| self.register_service(services.AdbLogcatService(self)) |
| self.register_service(services.Sl4aService(self)) |
| self.adb.wait_for_device() |
| self.terminate_all_sessions() |
| self.start_services() |
| |
| def register_service(self, service): |
| """Registers the service on the device.""" |
| service.register() |
| self._services.append(service) |
| |
| # TODO(angli): This function shall be refactored to accommodate all services |
| # and not have hard coded switch for SL4A when b/29157104 is done. |
| def start_services(self, skip_setup_wizard=True): |
| """Starts long running services on the android device. |
| |
| 1. Start adb logcat capture. |
| 2. Start SL4A if not skipped. |
| |
| Args: |
| skip_setup_wizard: Whether or not to skip the setup wizard. |
| """ |
| if skip_setup_wizard: |
| self.exit_setup_wizard() |
| |
| event_bus.post(android_events.AndroidStartServicesEvent(self)) |
| |
| def stop_services(self): |
| """Stops long running services on the android device. |
| |
| Stop adb logcat and terminate sl4a sessions if exist. |
| """ |
| event_bus.post( |
| android_events.AndroidStopServicesEvent(self), ignore_errors=True |
| ) |
| |
| def is_connected(self): |
| out = self.adb.devices() |
| devices = _parse_device_list(out, "device") |
| return self.serial in devices |
| |
| @property |
| def build_info(self): |
| """Get the build info of this Android device, including build id and |
| build type. |
| |
| This is not available if the device is in bootloader mode. |
| |
| Returns: |
| A dict with the build info of this Android device, or None if the |
| device is in bootloader mode. |
| """ |
| if self.is_bootloader: |
| self.log.error("Device is in fastboot mode, could not get build " "info.") |
| return |
| |
| build_id = self.adb.getprop("ro.build.id") |
| incremental_build_id = self.adb.getprop("ro.build.version.incremental") |
| valid_build_id = False |
| for regex in RELEASE_ID_REGEXES: |
| if re.match(regex, build_id): |
| valid_build_id = True |
| break |
| if not valid_build_id: |
| build_id = incremental_build_id |
| |
| info = { |
| "build_id": build_id, |
| "incremental_build_id": incremental_build_id, |
| "build_type": self.adb.getprop("ro.build.type"), |
| } |
| return info |
| |
| @property |
| def device_info(self): |
| """Information to be pulled into controller info. |
| |
| The latest serial, model, and build_info are included. Additional info |
| can be added via `add_device_info`. |
| """ |
| info = { |
| "serial": self.serial, |
| "model": self.model, |
| "build_info": self.build_info, |
| "user_added_info": self._user_added_device_info, |
| "flavor": self.flavor, |
| } |
| return info |
| |
| def add_device_info(self, name, info): |
| """Add custom device info to the user_added_info section. |
| |
| Adding the same info name the second time will override existing info. |
| |
| Args: |
| name: string, name of this info. |
| info: serializable, content of the info. |
| """ |
| self._user_added_device_info.update({name: info}) |
| |
| def sdk_api_level(self): |
| if self._sdk_api_level is not None: |
| return self._sdk_api_level |
| if self.is_bootloader: |
| self.log.error("Device is in fastboot mode. Cannot get build info.") |
| return |
| self._sdk_api_level = int(self.adb.shell("getprop ro.build.version.sdk")) |
| return self._sdk_api_level |
| |
| @property |
| def is_bootloader(self): |
| """True if the device is in bootloader mode.""" |
| return self.serial in list_fastboot_devices() |
| |
| @property |
| def is_adb_root(self): |
| """True if adb is running as root for this device.""" |
| try: |
| return "0" == self.adb.shell("id -u") |
| except AdbError: |
| # Wait a bit and retry to work around adb flakiness for this cmd. |
| time.sleep(0.2) |
| return "0" == self.adb.shell("id -u") |
| |
| @property |
| def model(self): |
| """The Android code name for the device.""" |
| # If device is in bootloader mode, get mode name from fastboot. |
| if self.is_bootloader: |
| out = self.fastboot.getvar("product").strip() |
| # "out" is never empty because of the "total time" message fastboot |
| # writes to stderr. |
| lines = out.split("\n", 1) |
| if lines: |
| tokens = lines[0].split(" ") |
| if len(tokens) > 1: |
| return tokens[1].lower() |
| return None |
| model = self.adb.getprop("ro.build.product").lower() |
| if model == "sprout": |
| return model |
| else: |
| return self.adb.getprop("ro.product.name").lower() |
| |
| @property |
| def flavor(self): |
| """Returns the specific flavor of Android build the device is using.""" |
| return self.adb.getprop("ro.build.flavor").lower() |
| |
| @property |
| def droid(self): |
| """Returns the RPC Service of the first Sl4aSession created.""" |
| if len(self._sl4a_manager.sessions) > 0: |
| session_id = sorted(self._sl4a_manager.sessions.keys())[0] |
| return self._sl4a_manager.sessions[session_id].rpc_client |
| else: |
| return None |
| |
| @property |
| def ed(self): |
| """Returns the event dispatcher of the first Sl4aSession created.""" |
| if len(self._sl4a_manager.sessions) > 0: |
| session_id = sorted(self._sl4a_manager.sessions.keys())[0] |
| return self._sl4a_manager.sessions[session_id].get_event_dispatcher() |
| else: |
| return None |
| |
| @property |
| def sl4a_sessions(self): |
| """Returns a dictionary of session ids to sessions.""" |
| return list(self._sl4a_manager.sessions) |
| |
| @property |
| def is_adb_logcat_on(self): |
| """Whether there is an ongoing adb logcat collection.""" |
| if self.adb_logcat_process: |
| if self.adb_logcat_process.is_running(): |
| return True |
| else: |
| # if skip_sl4a is true, there is no sl4a session |
| # if logcat died due to device reboot and sl4a session has |
| # not restarted there is no droid. |
| if self.droid: |
| self.droid.logI("Logcat died") |
| self.log.info("Logcat to %s died", self.log_path) |
| return False |
| return False |
| |
| @property |
| def device_log_path(self): |
| """Returns the directory for all Android device logs for the current |
| test context and serial. |
| """ |
| return context.get_current_context().get_full_output_path(self.serial) |
| |
| def update_sdk_api_level(self): |
| self._sdk_api_level = None |
| self.sdk_api_level() |
| |
| def load_config(self, config): |
| """Add attributes to the AndroidDevice object based on json config. |
| |
| Args: |
| config: A dictionary representing the configs. |
| |
| Raises: |
| AndroidDeviceError is raised if the config is trying to overwrite |
| an existing attribute. |
| """ |
| for k, v in config.items(): |
| # skip_sl4a value can be reset from config file |
| if hasattr(self, k) and k != "skip_sl4a": |
| raise errors.AndroidDeviceError( |
| f"Attempting to set existing attribute {k} on {self.serial}", |
| serial=self.serial, |
| ) |
| setattr(self, k, v) |
| |
| def root_adb(self): |
| """Change adb to root mode for this device if allowed. |
| |
| If executed on a production build, adb will not be switched to root |
| mode per security restrictions. |
| """ |
| if self.is_adb_root: |
| return |
| |
| for attempt in range(ADB_ROOT_RETRY_COUNT): |
| try: |
| self.log.debug(f"Enabling ADB root mode: attempt {attempt}.") |
| self.adb.root() |
| except AdbError: |
| if attempt == ADB_ROOT_RETRY_COUNT: |
| raise |
| time.sleep(ADB_ROOT_RETRY_INTERVAL) |
| self.adb.wait_for_device() |
| |
| def get_droid(self, handle_event=True): |
| """Create an sl4a connection to the device. |
| |
| Return the connection handler 'droid'. By default, another connection |
| on the same session is made for EventDispatcher, and the dispatcher is |
| returned to the caller as well. |
| If sl4a server is not started on the device, try to start it. |
| |
| Args: |
| handle_event: True if this droid session will need to handle |
| events. |
| |
| Returns: |
| droid: Android object used to communicate with sl4a on the android |
| device. |
| ed: An optional EventDispatcher to organize events for this droid. |
| |
| Examples: |
| Don't need event handling: |
| >>> ad = AndroidDevice() |
| >>> droid = ad.get_droid(False) |
| |
| Need event handling: |
| >>> ad = AndroidDevice() |
| >>> droid, ed = ad.get_droid() |
| """ |
| self.log.debug( |
| "Creating RPC client_port={}, forwarded_port={}, server_port={}".format( |
| self.client_port, self.forwarded_port, self.server_port |
| ) |
| ) |
| session = self._sl4a_manager.create_session( |
| client_port=self.client_port, |
| forwarded_port=self.forwarded_port, |
| server_port=self.server_port, |
| ) |
| droid = session.rpc_client |
| if handle_event: |
| ed = session.get_event_dispatcher() |
| return droid, ed |
| return droid |
| |
| def get_package_pid(self, package_name): |
| """Gets the pid for a given package. Returns None if not running. |
| Args: |
| package_name: The name of the package. |
| Returns: |
| The first pid found under a given package name. None if no process |
| was found running the package. |
| Raises: |
| AndroidDeviceError if the output of the phone's process list was |
| in an unexpected format. |
| """ |
| for cmd in ("ps -A", "ps"): |
| try: |
| out = self.adb.shell( |
| f'{cmd} | grep "S {package_name}"', ignore_status=True |
| ) |
| if package_name not in out: |
| continue |
| try: |
| pid = int(out.split()[1]) |
| self.log.info("apk %s has pid %s.", package_name, pid) |
| return pid |
| except (IndexError, ValueError) as e: |
| # Possible ValueError from string to int cast. |
| # Possible IndexError from split. |
| self.log.warning( |
| 'Command "%s" returned output line: ' '"%s".\nError: %s', |
| cmd, |
| out, |
| e, |
| ) |
| except Exception as e: |
| self.log.warning( |
| 'Device fails to check if %s running with "%s"\n' "Exception %s", |
| package_name, |
| cmd, |
| e, |
| ) |
| self.log.debug("apk %s is not running", package_name) |
| return None |
| |
| def get_dispatcher(self, droid): |
| """Return an EventDispatcher for an sl4a session |
| |
| Args: |
| droid: Session to create EventDispatcher for. |
| |
| Returns: |
| ed: An EventDispatcher for specified session. |
| """ |
| return self._sl4a_manager.sessions[droid.uid].get_event_dispatcher() |
| |
| def _is_timestamp_in_range(self, target, log_begin_time, log_end_time): |
| low = acts_logger.logline_timestamp_comparator(log_begin_time, target) <= 0 |
| high = acts_logger.logline_timestamp_comparator(log_end_time, target) >= 0 |
| return low and high |
| |
| def cat_adb_log(self, tag, begin_time, end_time=None, dest_path="AdbLogExcerpts"): |
| """Takes an excerpt of the adb logcat log from a certain time point to |
| current time. |
| |
| Args: |
| tag: An identifier of the time period, usually the name of a test. |
| begin_time: Epoch time of the beginning of the time period. |
| end_time: Epoch time of the ending of the time period, default None |
| dest_path: Destination path of the excerpt file. |
| """ |
| log_begin_time = acts_logger.epoch_to_log_line_timestamp(begin_time) |
| if end_time is None: |
| log_end_time = acts_logger.get_log_line_timestamp() |
| else: |
| log_end_time = acts_logger.epoch_to_log_line_timestamp(end_time) |
| self.log.debug("Extracting adb log from logcat.") |
| logcat_path = os.path.join( |
| self.device_log_path, f"adblog_{self.serial}_debug.txt" |
| ) |
| if not os.path.exists(logcat_path): |
| self.log.warning(f"Logcat file {logcat_path} does not exist.") |
| return |
| adb_excerpt_dir = os.path.join(self.log_path, dest_path) |
| os.makedirs(adb_excerpt_dir, exist_ok=True) |
| out_name = "%s,%s.txt" % ( |
| acts_logger.normalize_log_line_timestamp(log_begin_time), |
| self.serial, |
| ) |
| tag_len = utils.MAX_FILENAME_LEN - len(out_name) |
| out_name = f"{tag[:tag_len]},{out_name}" |
| adb_excerpt_path = os.path.join(adb_excerpt_dir, out_name) |
| with open(adb_excerpt_path, "w", encoding="utf-8") as out: |
| in_file = logcat_path |
| with open(in_file, "r", encoding="utf-8", errors="replace") as f: |
| while True: |
| line = None |
| try: |
| line = f.readline() |
| if not line: |
| break |
| except: |
| continue |
| line_time = line[: acts_logger.log_line_timestamp_len] |
| if not acts_logger.is_valid_logline_timestamp(line_time): |
| continue |
| if self._is_timestamp_in_range( |
| line_time, log_begin_time, log_end_time |
| ): |
| if not line.endswith("\n"): |
| line += "\n" |
| out.write(line) |
| return adb_excerpt_path |
| |
| def search_logcat( |
| self, matching_string, begin_time=None, end_time=None, logcat_path=None |
| ): |
| """Search logcat message with given string. |
| |
| Args: |
| matching_string: matching_string to search. |
| begin_time: only the lines with time stamps later than begin_time |
| will be searched. |
| end_time: only the lines with time stamps earlier than end_time |
| will be searched. |
| logcat_path: the path of a specific file in which the search should |
| be performed. If None the path will be the default device log |
| path. |
| |
| Returns: |
| A list of dictionaries with full log message, time stamp string, |
| time object and message ID. For example: |
| [{"log_message": "05-03 17:39:29.898 968 1001 D" |
| "ActivityManager: Sending BOOT_COMPLETE user #0", |
| "time_stamp": "2017-05-03 17:39:29.898", |
| "datetime_obj": datetime object, |
| "message_id": None}] |
| |
| [{"log_message": "08-12 14:26:42.611043 2360 2510 D RILJ : " |
| "[0853]< DEACTIVATE_DATA_CALL [PHONE0]", |
| "time_stamp": "2020-08-12 14:26:42.611043", |
| "datetime_obj": datetime object}, |
| "message_id": "0853"}] |
| """ |
| if not logcat_path: |
| logcat_path = os.path.join( |
| self.device_log_path, f"adblog_{self.serial}_debug.txt" |
| ) |
| if not os.path.exists(logcat_path): |
| self.log.warning(f"Logcat file {logcat_path} does not exist.") |
| return |
| output = job.run(f"grep '{matching_string}' {logcat_path}", ignore_status=True) |
| if not output.stdout or output.exit_status != 0: |
| return [] |
| if begin_time: |
| if not isinstance(begin_time, datetime): |
| log_begin_time = acts_logger.epoch_to_log_line_timestamp(begin_time) |
| begin_time = datetime.strptime(log_begin_time, "%Y-%m-%d %H:%M:%S.%f") |
| if end_time: |
| if not isinstance(end_time, datetime): |
| log_end_time = acts_logger.epoch_to_log_line_timestamp(end_time) |
| end_time = datetime.strptime(log_end_time, "%Y-%m-%d %H:%M:%S.%f") |
| result = [] |
| logs = re.findall(r"(\S+\s\S+)(.*)", output.stdout) |
| for log in logs: |
| time_stamp = log[0] |
| time_obj = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S.%f") |
| |
| if begin_time and time_obj < begin_time: |
| continue |
| |
| if end_time and time_obj > end_time: |
| continue |
| |
| res = re.findall(r".*\[(\d+)\]", log[1]) |
| try: |
| message_id = res[0] |
| except: |
| message_id = None |
| |
| result.append( |
| { |
| "log_message": "".join(log), |
| "time_stamp": time_stamp, |
| "datetime_obj": time_obj, |
| "message_id": message_id, |
| } |
| ) |
| return result |
| |
| def start_adb_logcat(self): |
| """Starts a standing adb logcat collection in separate subprocesses and |
| save the logcat in a file. |
| """ |
| if self.is_adb_logcat_on: |
| self.log.warning( |
| "Android device %s already has a running adb logcat thread. " |
| % self.serial |
| ) |
| return |
| # Disable adb log spam filter. Have to stop and clear settings first |
| # because 'start' doesn't support --clear option before Android N. |
| self.adb.shell("logpersist.stop --clear", ignore_status=True) |
| self.adb.shell("logpersist.start", ignore_status=True) |
| if hasattr(self, "adb_logcat_param"): |
| extra_params = self.adb_logcat_param |
| else: |
| extra_params = "-b all" |
| |
| self.adb_logcat_process = logcat.create_logcat_keepalive_process( |
| self.serial, self.log_dir, extra_params |
| ) |
| self.adb_logcat_process.start() |
| |
| def stop_adb_logcat(self): |
| """Stops the adb logcat collection subprocess.""" |
| if not self.is_adb_logcat_on: |
| self.log.warning( |
| f"Android device {self.serial} does not have an ongoing adb logcat " |
| ) |
| return |
| # Set the last timestamp to the current timestamp. This may cause |
| # a race condition that allows the same line to be logged twice, |
| # but it does not pose a problem for our logging purposes. |
| self.adb_logcat_process.stop() |
| self.adb_logcat_process = None |
| |
| def get_apk_uid(self, apk_name): |
| """Get the uid of the given apk. |
| |
| Args: |
| apk_name: Name of the package, e.g., com.android.phone. |
| |
| Returns: |
| Linux UID for the apk. |
| """ |
| output = self.adb.shell( |
| f"dumpsys package {apk_name} | grep userId=", ignore_status=True |
| ) |
| result = re.search(r"userId=(\d+)", output) |
| if result: |
| return result.group(1) |
| else: |
| None |
| |
| def get_apk_version(self, package_name): |
| """Get the version of the given apk. |
| |
| Args: |
| package_name: Name of the package, e.g., com.android.phone. |
| |
| Returns: |
| Version of the given apk. |
| """ |
| try: |
| output = self.adb.shell( |
| f"dumpsys package {package_name} | grep versionName" |
| ) |
| pattern = re.compile(r"versionName=(.+)", re.I) |
| result = pattern.findall(output) |
| if result: |
| return result[0] |
| except Exception as e: |
| self.log.warning( |
| "Fail to get the version of package %s: %s", package_name, e |
| ) |
| self.log.debug("apk %s is not found", package_name) |
| return None |
| |
| def is_apk_installed(self, package_name): |
| """Check if the given apk is already installed. |
| |
| Args: |
| package_name: Name of the package, e.g., com.android.phone. |
| |
| Returns: |
| True if package is installed. False otherwise. |
| """ |
| |
| try: |
| return bool( |
| self.adb.shell( |
| f'(pm list packages | grep -w "package:{package_name}") || true' |
| ) |
| ) |
| |
| except Exception as err: |
| self.log.error( |
| "Could not determine if %s is installed. " "Received error:\n%s", |
| package_name, |
| err, |
| ) |
| return False |
| |
| def is_sl4a_installed(self): |
| return self.is_apk_installed(SL4A_APK_NAME) |
| |
| def is_apk_running(self, package_name): |
| """Check if the given apk is running. |
| |
| Args: |
| package_name: Name of the package, e.g., com.android.phone. |
| |
| Returns: |
| True if package is installed. False otherwise. |
| """ |
| for cmd in ("ps -A", "ps"): |
| try: |
| out = self.adb.shell( |
| f'{cmd} | grep "S {package_name}"', ignore_status=True |
| ) |
| if package_name in out: |
| self.log.info("apk %s is running", package_name) |
| return True |
| except Exception as e: |
| self.log.warning( |
| "Device fails to check is %s running by %s " "Exception %s", |
| package_name, |
| cmd, |
| e, |
| ) |
| continue |
| self.log.debug("apk %s is not running", package_name) |
| return False |
| |
| def is_sl4a_running(self): |
| return self.is_apk_running(SL4A_APK_NAME) |
| |
| def force_stop_apk(self, package_name): |
| """Force stop the given apk. |
| |
| Args: |
| package_name: Name of the package, e.g., com.android.phone. |
| |
| Returns: |
| True if package is installed. False otherwise. |
| """ |
| try: |
| self.adb.shell(f"am force-stop {package_name}", ignore_status=True) |
| except Exception as e: |
| self.log.warning("Fail to stop package %s: %s", package_name, e) |
| |
| def take_bug_report(self, test_name=None, begin_time=None): |
| """Takes a bug report on the device and stores it in a file. |
| |
| Args: |
| test_name: Name of the test case that triggered this bug report. |
| begin_time: Epoch time when the test started. If none is specified, |
| the current time will be used. |
| """ |
| self.adb.wait_for_device(timeout=WAIT_FOR_DEVICE_TIMEOUT) |
| new_br = True |
| try: |
| stdout = self.adb.shell("bugreportz -v") |
| # This check is necessary for builds before N, where adb shell's ret |
| # code and stderr are not propagated properly. |
| if "not found" in stdout: |
| new_br = False |
| except AdbError: |
| new_br = False |
| br_path = self.device_log_path |
| os.makedirs(br_path, exist_ok=True) |
| epoch = begin_time if begin_time else utils.get_current_epoch_time() |
| time_stamp = acts_logger.normalize_log_line_timestamp( |
| acts_logger.epoch_to_log_line_timestamp(epoch) |
| ) |
| out_name = f"AndroidDevice{self.serial}_{time_stamp}" |
| out_name = f"{out_name}.zip" if new_br else f"{out_name}.txt" |
| full_out_path = os.path.join(br_path, out_name) |
| # in case device restarted, wait for adb interface to return |
| self.wait_for_boot_completion() |
| if test_name: |
| self.log.info("Taking bugreport for %s.", test_name) |
| else: |
| self.log.info("Taking bugreport.") |
| if new_br: |
| out = self.adb.shell("bugreportz", timeout=BUG_REPORT_TIMEOUT) |
| if not out.startswith("OK"): |
| raise errors.AndroidDeviceError( |
| f"Failed to take bugreport on {self.serial}: {out}", |
| serial=self.serial, |
| ) |
| br_out_path = out.split(":")[1].strip().split()[0] |
| self.adb.pull(f"{br_out_path} {full_out_path}") |
| else: |
| self.adb.bugreport(f" > {full_out_path}", timeout=BUG_REPORT_TIMEOUT) |
| if test_name: |
| self.log.info("Bugreport for %s taken at %s.", test_name, full_out_path) |
| else: |
| self.log.info("Bugreport taken at %s.", test_name, full_out_path) |
| self.adb.wait_for_device(timeout=WAIT_FOR_DEVICE_TIMEOUT) |
| |
| def get_file_names( |
| self, directory, begin_time=None, skip_files=[], match_string=None |
| ): |
| """Get files names with provided directory.""" |
| cmd = f"find {directory} -type f" |
| if begin_time: |
| current_time = utils.get_current_epoch_time() |
| seconds = int(math.ceil((current_time - begin_time) / 1000.0)) |
| cmd = f"{cmd} -mtime -{seconds}s" |
| if match_string: |
| cmd = f"{cmd} -iname {match_string}" |
| for skip_file in skip_files: |
| cmd = f"{cmd} ! -iname {skip_file}" |
| out = self.adb.shell(cmd, ignore_status=True) |
| if ( |
| not out |
| or "No such" in out |
| or "Permission denied" in out |
| or "Not a directory" in out |
| ): |
| return [] |
| files = out.split("\n") |
| self.log.debug("Find files in directory %s: %s", directory, files) |
| return files |
| |
| @property |
| def external_storage_path(self): |
| """ |
| The $EXTERNAL_STORAGE path on the device. Most commonly set to '/sdcard' |
| """ |
| return self.adb.shell("echo $EXTERNAL_STORAGE") |
| |
| def file_exists(self, file_path): |
| """Returns whether a file exists on a device. |
| |
| Args: |
| file_path: The path of the file to check for. |
| """ |
| cmd = f"(test -f {file_path} && echo yes) || echo no" |
| result = self.adb.shell(cmd) |
| if result == "yes": |
| return True |
| elif result == "no": |
| return False |
| raise ValueError( |
| "Couldn't determine if %s exists. " |
| "Expected yes/no, got %s" % (file_path, result[cmd]) |
| ) |
| |
| def pull_files(self, device_paths, host_path=None): |
| """Pull files from devices. |
| |
| Args: |
| device_paths: List of paths on the device to pull from. |
| host_path: Destination path |
| """ |
| if isinstance(device_paths, str): |
| device_paths = [device_paths] |
| if not host_path: |
| host_path = self.log_path |
| for device_path in device_paths: |
| self.log.info(f"Pull from device: {device_path} -> {host_path}") |
| self.adb.pull(f"{device_path} {host_path}", timeout=PULL_TIMEOUT) |
| |
| def check_crash_report( |
| self, test_name=None, begin_time=None, log_crash_report=False |
| ): |
| """check crash report on the device.""" |
| crash_reports = [] |
| for crash_path in CRASH_REPORT_PATHS: |
| try: |
| cmd = f"cd {crash_path}" |
| self.adb.shell(cmd) |
| except Exception as e: |
| self.log.debug("received exception %s", e) |
| continue |
| crashes = self.get_file_names( |
| crash_path, skip_files=CRASH_REPORT_SKIPS, begin_time=begin_time |
| ) |
| if crash_path == "/data/tombstones/" and crashes: |
| tombstones = crashes[:] |
| for tombstone in tombstones: |
| if self.adb.shell( |
| f'cat {tombstone} | grep "crash_dump failed to dump process"' |
| ): |
| crashes.remove(tombstone) |
| if crashes: |
| crash_reports.extend(crashes) |
| if crash_reports and log_crash_report: |
| crash_log_path = os.path.join( |
| self.device_log_path, f"Crashes_{self.serial}" |
| ) |
| os.makedirs(crash_log_path, exist_ok=True) |
| self.pull_files(crash_reports, crash_log_path) |
| return crash_reports |
| |
| def get_qxdm_logs(self, test_name="", begin_time=None): |
| """Get qxdm logs.""" |
| # Sleep 10 seconds for the buffered log to be written in qxdm log file |
| time.sleep(10) |
| log_path = getattr(self, "qxdm_log_path", DEFAULT_QXDM_LOG_PATH) |
| qxdm_logs = self.get_file_names( |
| log_path, begin_time=begin_time, match_string="*.qmdl" |
| ) |
| if qxdm_logs: |
| qxdm_log_path = os.path.join(self.device_log_path, f"QXDM_{self.serial}") |
| os.makedirs(qxdm_log_path, exist_ok=True) |
| |
| self.log.info("Pull QXDM Log %s to %s", qxdm_logs, qxdm_log_path) |
| self.pull_files(qxdm_logs, qxdm_log_path) |
| |
| self.adb.pull( |
| f"/firmware/image/qdsp6m.qdb {qxdm_log_path}", |
| timeout=PULL_TIMEOUT, |
| ignore_status=True, |
| ) |
| # Zip Folder |
| utils.zip_directory(f"{qxdm_log_path}.zip", qxdm_log_path) |
| shutil.rmtree(qxdm_log_path) |
| else: |
| self.log.error(f"Didn't find QXDM logs in {log_path}.") |
| if "Verizon" in self.adb.getprop("gsm.sim.operator.alpha"): |
| omadm_log_path = os.path.join(self.device_log_path, f"OMADM_{self.serial}") |
| os.makedirs(omadm_log_path, exist_ok=True) |
| self.log.info("Pull OMADM Log") |
| self.adb.pull( |
| f"/data/data/com.android.omadm.service/files/dm/log/ {omadm_log_path}", |
| timeout=PULL_TIMEOUT, |
| ignore_status=True, |
| ) |
| |
| def get_sdm_logs(self, test_name="", begin_time=None): |
| """Get sdm logs.""" |
| # Sleep 10 seconds for the buffered log to be written in sdm log file |
| time.sleep(10) |
| log_paths = [ |
| ALWAYS_ON_LOG_PATH, |
| getattr(self, "sdm_log_path", DEFAULT_SDM_LOG_PATH), |
| ] |
| sdm_logs = [] |
| for path in log_paths: |
| sdm_logs += self.get_file_names( |
| path, begin_time=begin_time, match_string="*.sdm*" |
| ) |
| if sdm_logs: |
| sdm_log_path = os.path.join(self.device_log_path, f"SDM_{self.serial}") |
| os.makedirs(sdm_log_path, exist_ok=True) |
| self.log.info("Pull SDM Log %s to %s", sdm_logs, sdm_log_path) |
| self.pull_files(sdm_logs, sdm_log_path) |
| else: |
| self.log.error(f"Didn't find SDM logs in {log_paths}.") |
| if "Verizon" in self.adb.getprop("gsm.sim.operator.alpha"): |
| omadm_log_path = os.path.join(self.device_log_path, f"OMADM_{self.serial}") |
| os.makedirs(omadm_log_path, exist_ok=True) |
| self.log.info("Pull OMADM Log") |
| self.adb.pull( |
| f"/data/data/com.android.omadm.service/files/dm/log/ {omadm_log_path}", |
| timeout=PULL_TIMEOUT, |
| ignore_status=True, |
| ) |
| |
| def start_new_session(self, max_connections=None, server_port=None): |
| """Start a new session in sl4a. |
| |
| Also caches the droid in a dict with its uid being the key. |
| |
| Returns: |
| An Android object used to communicate with sl4a on the android |
| device. |
| |
| Raises: |
| Sl4aException: Something is wrong with sl4a and it returned an |
| existing uid to a new session. |
| """ |
| session = self._sl4a_manager.create_session( |
| max_connections=max_connections, server_port=server_port |
| ) |
| |
| self._sl4a_manager.sessions[session.uid] = session |
| return session.rpc_client |
| |
| def terminate_all_sessions(self): |
| """Terminate all sl4a sessions on the AndroidDevice instance. |
| |
| Terminate all sessions and clear caches. |
| """ |
| self._sl4a_manager.terminate_all_sessions() |
| |
| def run_iperf_client_nb( |
| self, server_host, extra_args="", timeout=IPERF_TIMEOUT, log_file_path=None |
| ): |
| """Start iperf client on the device asynchronously. |
| |
| Return status as true if iperf client start successfully. |
| And data flow information as results. |
| |
| Args: |
| server_host: Address of the iperf server. |
| extra_args: A string representing extra arguments for iperf client, |
| e.g. "-i 1 -t 30". |
| log_file_path: The complete file path to log the results. |
| |
| """ |
| cmd = f"iperf3 -c {server_host} {extra_args}" |
| if log_file_path: |
| cmd += f" --logfile {log_file_path} &" |
| self.adb.shell_nb(cmd) |
| |
| def run_iperf_client(self, server_host, extra_args="", timeout=IPERF_TIMEOUT): |
| """Start iperf client on the device. |
| |
| Return status as true if iperf client start successfully. |
| And data flow information as results. |
| |
| Args: |
| server_host: Address of the iperf server. |
| extra_args: A string representing extra arguments for iperf client, |
| e.g. "-i 1 -t 30". |
| |
| Returns: |
| status: true if iperf client start successfully. |
| results: results have data flow information |
| """ |
| out = self.adb.shell(f"iperf3 -c {server_host} {extra_args}", timeout=timeout) |
| clean_out = out.split("\n") |
| if "error" in clean_out[0].lower(): |
| return False, clean_out |
| return True, clean_out |
| |
| def run_iperf_server(self, extra_args=""): |
| """Start iperf server on the device |
| |
| Return status as true if iperf server started successfully. |
| |
| Args: |
| extra_args: A string representing extra arguments for iperf server. |
| |
| Returns: |
| status: true if iperf server started successfully. |
| results: results have output of command |
| """ |
| out = self.adb.shell(f"iperf3 -s {extra_args}") |
| clean_out = out.split("\n") |
| if "error" in clean_out[0].lower(): |
| return False, clean_out |
| return True, clean_out |
| |
| def wait_for_boot_completion(self, timeout=900.0): |
| """Waits for Android framework to broadcast ACTION_BOOT_COMPLETED. |
| |
| Args: |
| timeout: Seconds to wait for the device to boot. Default value is |
| 15 minutes. |
| """ |
| timeout_start = time.time() |
| |
| self.log.debug("ADB waiting for device") |
| self.adb.wait_for_device(timeout=timeout) |
| self.log.debug("Waiting for sys.boot_completed") |
| while time.time() < timeout_start + timeout: |
| try: |
| completed = self.adb.getprop("sys.boot_completed") |
| if completed == "1": |
| self.log.debug("Device has rebooted") |
| return |
| except AdbError: |
| # adb shell calls may fail during certain period of booting |
| # process, which is normal. Ignoring these errors. |
| pass |
| time.sleep(5) |
| raise errors.AndroidDeviceError( |
| f"Device {self.serial} booting process timed out.", serial=self.serial |
| ) |
| |
| def reboot( |
| self, stop_at_lock_screen=False, timeout=180, wait_after_reboot_complete=1 |
| ): |
| """Reboots the device. |
| |
| Terminate all sl4a sessions, reboot the device, wait for device to |
| complete booting, and restart an sl4a session if restart_sl4a is True. |
| |
| Args: |
| stop_at_lock_screen: whether to unlock after reboot. Set to False |
| if want to bring the device to reboot up to password locking |
| phase. Sl4a checking need the device unlocked after rebooting. |
| timeout: time in seconds to wait for the device to complete |
| rebooting. |
| wait_after_reboot_complete: time in seconds to wait after the boot |
| completion. |
| """ |
| if self.is_bootloader: |
| self.fastboot.reboot() |
| return |
| self.stop_services() |
| self.log.info("Rebooting") |
| self.adb.reboot() |
| |
| timeout_start = time.time() |
| # b/111791239: Newer versions of android sometimes return early after |
| # `adb reboot` is called. This means subsequent calls may make it to |
| # the device before the reboot goes through, return false positives for |
| # getprops such as sys.boot_completed. |
| while time.time() < timeout_start + timeout: |
| try: |
| self.adb.get_state() |
| time.sleep(0.1) |
| except AdbError: |
| # get_state will raise an error if the device is not found. We |
| # want the device to be missing to prove the device has kicked |
| # off the reboot. |
| break |
| self.wait_for_boot_completion(timeout=(timeout - time.time() + timeout_start)) |
| |
| self.log.debug("Wait for a while after boot completion.") |
| time.sleep(wait_after_reboot_complete) |
| self.root_adb() |
| skip_sl4a = self.skip_sl4a |
| self.skip_sl4a = self.skip_sl4a or stop_at_lock_screen |
| self.start_services() |
| self.skip_sl4a = skip_sl4a |
| |
| def restart_runtime(self): |
| """Restarts android runtime. |
| |
| Terminate all sl4a sessions, restarts runtime, wait for framework |
| complete restart, and restart an sl4a session if restart_sl4a is True. |
| """ |
| self.stop_services() |
| self.log.info("Restarting android runtime") |
| self.adb.shell("stop") |
| # Reset the boot completed flag before we restart the framework |
| # to correctly detect when the framework has fully come up. |
| self.adb.shell("setprop sys.boot_completed 0") |
| self.adb.shell("start") |
| self.wait_for_boot_completion() |
| self.root_adb() |
| |
| self.start_services() |
| |
| def get_ipv4_address(self, interface="wlan0", timeout=5): |
| for timer in range(0, timeout): |
| try: |
| ip_string = self.adb.shell(f"ifconfig {interface}|grep inet") |
| break |
| except adb.AdbError as e: |
| if timer + 1 == timeout: |
| self.log.warning(f"Unable to find IP address for {interface}.") |
| return None |
| else: |
| time.sleep(1) |
| result = re.search("addr:(.*) Bcast", ip_string) |
| if result != None: |
| ip_address = result.group(1) |
| try: |
| socket.inet_aton(ip_address) |
| return ip_address |
| except socket.error: |
| return None |
| else: |
| return None |
| |
| def get_ipv4_gateway(self, timeout=5): |
| for timer in range(0, timeout): |
| try: |
| gateway_string = self.adb.shell("dumpsys wifi | grep mDhcpResults") |
| break |
| except adb.AdbError as e: |
| if timer + 1 == timeout: |
| self.log.warning("Unable to find gateway") |
| return None |
| else: |
| time.sleep(1) |
| result = re.search("Gateway (.*) DNS servers", gateway_string) |
| if result != None: |
| ipv4_gateway = result.group(1) |
| try: |
| socket.inet_aton(ipv4_gateway) |
| return ipv4_gateway |
| except socket.error: |
| return None |
| else: |
| return None |
| |
| def send_keycode(self, keycode): |
| self.adb.shell(f"input keyevent KEYCODE_{keycode}") |
| |
| def get_my_current_focus_window(self): |
| """Get the current focus window on screen""" |
| output = self.adb.shell( |
| "dumpsys window displays | grep -E mCurrentFocus | grep -v null", |
| ignore_status=True, |
| ) |
| if not output or "not found" in output or "Can't find" in output: |
| result = "" |
| else: |
| result = output.split(" ")[-1].strip("}") |
| self.log.debug("Current focus window is %s", result) |
| return result |
| |
| def get_my_current_focus_app(self): |
| """Get the current focus application""" |
| dumpsys_cmd = [ |
| "dumpsys window | grep -E mFocusedApp", |
| "dumpsys window displays | grep -E mFocusedApp", |
| ] |
| for cmd in dumpsys_cmd: |
| output = self.adb.shell(cmd, ignore_status=True) |
| if ( |
| not output |
| or "not found" in output |
| or "Can't find" in output |
| or ("mFocusedApp=null" in output) |
| ): |
| result = "" |
| else: |
| result = output.split(" ")[-2] |
| break |
| self.log.debug("Current focus app is %s", result) |
| return result |
| |
| def is_window_ready(self, window_name=None): |
| current_window = self.get_my_current_focus_window() |
| if window_name: |
| return window_name in current_window |
| return current_window and ENCRYPTION_WINDOW not in current_window |
| |
| def wait_for_window_ready( |
| self, window_name=None, check_interval=5, check_duration=60 |
| ): |
| elapsed_time = 0 |
| while elapsed_time < check_duration: |
| if self.is_window_ready(window_name=window_name): |
| return True |
| time.sleep(check_interval) |
| elapsed_time += check_interval |
| self.log.info("Current focus window is %s", self.get_my_current_focus_window()) |
| return False |
| |
| def is_user_setup_complete(self): |
| return "1" in self.adb.shell("settings get secure user_setup_complete") |
| |
| def is_screen_awake(self): |
| """Check if device screen is in sleep mode""" |
| return "Awake" in self.adb.shell("dumpsys power | grep mWakefulness=") |
| |
| def is_screen_emergency_dialer(self): |
| """Check if device screen is in emergency dialer mode""" |
| return "EmergencyDialer" in self.get_my_current_focus_window() |
| |
| def is_screen_in_call_activity(self): |
| """Check if device screen is in in-call activity notification""" |
| return "InCallActivity" in self.get_my_current_focus_window() |
| |
| def is_setupwizard_on(self): |
| """Check if device screen is in emergency dialer mode""" |
| return "setupwizard" in self.get_my_current_focus_app() |
| |
| def is_screen_lock_enabled(self): |
| """Check if screen lock is enabled""" |
| cmd = "dumpsys window policy | grep showing=" |
| out = self.adb.shell(cmd, ignore_status=True) |
| return "true" in out |
| |
| def is_waiting_for_unlock_pin(self): |
| """Check if device is waiting for unlock pin to boot up""" |
| current_window = self.get_my_current_focus_window() |
| current_app = self.get_my_current_focus_app() |
| if ENCRYPTION_WINDOW in current_window: |
| self.log.info("Device is in CrpytKeeper window") |
| return True |
| if "StatusBar" in current_window and ( |
| (not current_app) or "FallbackHome" in current_app |
| ): |
| self.log.info("Device is locked") |
| return True |
| return False |
| |
| def ensure_screen_on(self): |
| """Ensure device screen is powered on""" |
| if self.is_screen_lock_enabled(): |
| for _ in range(2): |
| self.unlock_screen() |
| time.sleep(1) |
| if self.is_waiting_for_unlock_pin(): |
| self.unlock_screen(password=DEFAULT_DEVICE_PASSWORD) |
| time.sleep(1) |
| if ( |
| not self.is_waiting_for_unlock_pin() |
| and self.wait_for_window_ready() |
| ): |
| return True |
| return False |
| else: |
| self.wakeup_screen() |
| return True |
| |
| def wakeup_screen(self): |
| if not self.is_screen_awake(): |
| self.log.info("Screen is not awake, wake it up") |
| self.send_keycode("WAKEUP") |
| |
| def go_to_sleep(self): |
| if self.is_screen_awake(): |
| self.send_keycode("SLEEP") |
| |
| def send_keycode_number_pad(self, number): |
| self.send_keycode(f"NUMPAD_{number}") |
| |
| def unlock_screen(self, password=None): |
| self.log.info("Unlocking with %s", password or "swipe up") |
| # Bring device to SLEEP so that unlock process can start fresh |
| self.send_keycode("SLEEP") |
| time.sleep(1) |
| self.send_keycode("WAKEUP") |
| if ENCRYPTION_WINDOW not in self.get_my_current_focus_app(): |
| self.send_keycode("MENU") |
| if password: |
| self.send_keycode("DEL") |
| for number in password: |
| self.send_keycode_number_pad(number) |
| self.send_keycode("ENTER") |
| self.send_keycode("BACK") |
| |
| def screenshot(self, name=""): |
| """Take a screenshot on the device. |
| |
| Args: |
| name: additional information of screenshot on the file name. |
| """ |
| if name: |
| file_name = f"{DEFAULT_SCREENSHOT_PATH}_{name}" |
| file_name = f"{file_name}_{utils.get_current_epoch_time()}.png" |
| self.ensure_screen_on() |
| self.log.info("Log screenshot to %s", file_name) |
| try: |
| self.adb.shell(f"screencap -p {file_name}") |
| except: |
| self.log.error("Fail to log screenshot to %s", file_name) |
| |
| def exit_setup_wizard(self): |
| # Handling Android TV's setupwizard is ignored for now. |
| if "feature:android.hardware.type.television" in self.adb.shell( |
| "pm list features" |
| ): |
| return |
| if not self.is_user_setup_complete() or self.is_setupwizard_on(): |
| # b/116709539 need this to prevent reboot after skip setup wizard |
| self.adb.shell( |
| "am start -a com.android.setupwizard.EXIT", ignore_status=True |
| ) |
| self.adb.shell( |
| f"pm disable {self.get_setupwizard_package_name()}", |
| ignore_status=True, |
| ) |
| # Wait up to 5 seconds for user_setup_complete to be updated |
| end_time = time.time() + 5 |
| while time.time() < end_time: |
| if self.is_user_setup_complete() or not self.is_setupwizard_on(): |
| return |
| |
| # If fail to exit setup wizard, set local.prop and reboot |
| if not self.is_user_setup_complete() and self.is_setupwizard_on(): |
| self.adb.shell("echo ro.test_harness=1 > /data/local.prop") |
| self.adb.shell("chmod 644 /data/local.prop") |
| self.reboot(stop_at_lock_screen=True) |
| |
| def get_setupwizard_package_name(self): |
| """Finds setupwizard package/.activity |
| |
| Bypass setupwizard or setupwraith depending on device. |
| |
| Returns: |
| packageName/.ActivityName |
| """ |
| packages_to_skip = "'setupwizard|setupwraith'" |
| android_package_name = "com.google.android" |
| package = self.adb.shell( |
| "pm list packages -f | grep -E {} | grep {}".format( |
| packages_to_skip, android_package_name |
| ) |
| ) |
| wizard_package = package.split("=")[1] |
| activity = package.split("=")[0].split("/")[-2] |
| self.log.info(f"{wizard_package}/.{activity}Activity") |
| return f"{wizard_package}/.{activity}Activity" |
| |
| def push_system_file(self, src_file_path, dst_file_path, push_timeout=300): |
| """Pushes a file onto the read-only file system. |
| |
| For speed, the device is left in root mode after this call, and leaves |
| verity disabled. To re-enable verity, call ensure_verity_enabled(). |
| |
| Args: |
| src_file_path: The path to the system app to install. |
| dst_file_path: The destination of the file. |
| push_timeout: How long to wait for the push to finish. |
| Returns: |
| Whether or not the install was successful. |
| """ |
| self.adb.ensure_root() |
| try: |
| self.ensure_verity_disabled() |
| self.adb.remount() |
| out = self.adb.push( |
| f"{src_file_path} {dst_file_path}", timeout=push_timeout |
| ) |
| if "error" in out: |
| self.log.error( |
| "Unable to push system file %s to %s due to %s", |
| src_file_path, |
| dst_file_path, |
| out, |
| ) |
| return False |
| return True |
| except Exception as e: |
| self.log.error( |
| "Unable to push system file %s to %s due to %s", |
| src_file_path, |
| dst_file_path, |
| e, |
| ) |
| return False |
| |
| def ensure_verity_enabled(self): |
| """Ensures that verity is enabled. |
| |
| If verity is not enabled, this call will reboot the phone. Note that |
| this only works on debuggable builds. |
| """ |
| user = self.adb.get_user_id() |
| # The below properties will only exist if verity has been enabled. |
| system_verity = self.adb.getprop("partition.system.verified") |
| vendor_verity = self.adb.getprop("partition.vendor.verified") |
| if not system_verity or not vendor_verity: |
| self.adb.ensure_root() |
| self.adb.enable_verity() |
| self.reboot() |
| self.adb.ensure_user(user) |
| |
| def ensure_verity_disabled(self): |
| """Ensures that verity is disabled. |
| |
| If verity is enabled, this call will reboot the phone. |
| """ |
| user = self.adb.get_user_id() |
| # The below properties will only exist if verity has been enabled. |
| system_verity = self.adb.getprop("partition.system.verified") |
| vendor_verity = self.adb.getprop("partition.vendor.verified") |
| if system_verity or vendor_verity: |
| self.adb.ensure_root() |
| self.adb.disable_verity() |
| self.reboot() |
| self.adb.ensure_user(user) |
| |
| |
| class AndroidDeviceLoggerAdapter(logging.LoggerAdapter): |
| def process(self, msg, kwargs): |
| msg = f"[AndroidDevice|{self.extra['serial']}] {msg}" |
| return (msg, kwargs) |