Move to 2-space indent: AndroidDevice related files. (#705)
Pure shortening of the indentation length, no actual code change.
All files related to android_device, including the libs.
diff --git a/mobly/controllers/android_device.py b/mobly/controllers/android_device.py
index cdd680b..5a6dcb0 100644
--- a/mobly/controllers/android_device.py
+++ b/mobly/controllers/android_device.py
@@ -46,15 +46,15 @@
# System properties that are cached by the `AndroidDevice.build_info` property.
# The only properties on this list should be read-only system properties.
CACHED_SYSTEM_PROPS = [
- 'ro.build.id',
- 'ro.build.type',
- 'ro.build.version.codename',
- 'ro.build.version.sdk',
- 'ro.build.product',
- 'ro.build.characteristics',
- 'ro.debuggable',
- 'ro.product.name',
- 'ro.hardware',
+ 'ro.build.id',
+ 'ro.build.type',
+ 'ro.build.version.codename',
+ 'ro.build.version.sdk',
+ 'ro.build.product',
+ 'ro.build.characteristics',
+ 'ro.debuggable',
+ 'ro.product.name',
+ 'ro.hardware',
]
# Keys for attributes in configs that alternate the controller module behavior.
@@ -87,1035 +87,1035 @@
def create(configs):
- """Creates AndroidDevice controller objects.
+ """Creates AndroidDevice controller objects.
- Args:
- configs: A list of dicts, each representing a configuration for an
- Android device.
+ Args:
+ configs: A list of dicts, each representing a configuration for an
+ Android device.
- Returns:
- A list of AndroidDevice objects.
- """
- if not configs:
- raise Error(ANDROID_DEVICE_EMPTY_CONFIG_MSG)
- elif configs == ANDROID_DEVICE_PICK_ALL_TOKEN:
- ads = get_all_instances()
- elif not isinstance(configs, list):
- raise Error(ANDROID_DEVICE_NOT_LIST_CONFIG_MSG)
- elif isinstance(configs[0], dict):
- # Configs is a list of dicts.
- ads = get_instances_with_configs(configs)
- elif isinstance(configs[0], str):
- # Configs is a list of strings representing serials.
- ads = get_instances(configs)
- else:
- raise Error('No valid config found in: %s' % configs)
- valid_ad_identifiers = list_adb_devices() + list_adb_devices_by_usb_id()
+ Returns:
+ A list of AndroidDevice objects.
+ """
+ if not configs:
+ raise Error(ANDROID_DEVICE_EMPTY_CONFIG_MSG)
+ elif configs == ANDROID_DEVICE_PICK_ALL_TOKEN:
+ ads = get_all_instances()
+ elif not isinstance(configs, list):
+ raise Error(ANDROID_DEVICE_NOT_LIST_CONFIG_MSG)
+ elif isinstance(configs[0], dict):
+ # Configs is a list of dicts.
+ ads = get_instances_with_configs(configs)
+ elif isinstance(configs[0], str):
+ # Configs is a list of strings representing serials.
+ ads = get_instances(configs)
+ else:
+ raise Error('No valid config found in: %s' % configs)
+ valid_ad_identifiers = list_adb_devices() + list_adb_devices_by_usb_id()
- for ad in ads:
- if ad.serial not in valid_ad_identifiers:
- raise DeviceError(
- ad, 'Android device is specified in config but is not '
- 'attached.')
- _start_services_on_ads(ads)
- return ads
+ for ad in ads:
+ if ad.serial not in valid_ad_identifiers:
+ raise DeviceError(
+ ad, 'Android device is specified in config but is not '
+ 'attached.')
+ _start_services_on_ads(ads)
+ return ads
def destroy(ads):
- """Cleans up AndroidDevice objects.
+ """Cleans up AndroidDevice objects.
- Args:
- ads: A list of AndroidDevice objects.
- """
- for ad in ads:
- try:
- ad.services.stop_all()
- except:
- ad.log.exception('Failed to clean up properly.')
+ Args:
+ ads: A list of AndroidDevice objects.
+ """
+ for ad in ads:
+ try:
+ ad.services.stop_all()
+ except:
+ ad.log.exception('Failed to clean up properly.')
def get_info(ads):
- """Get information on a list of AndroidDevice objects.
+ """Get information on a list of AndroidDevice objects.
- Args:
- ads: A list of AndroidDevice objects.
+ Args:
+ ads: A list of AndroidDevice objects.
- Returns:
- A list of dict, each representing info for an AndroidDevice objects.
- """
- return [ad.device_info for ad in ads]
+ Returns:
+ A list of dict, each representing info for an AndroidDevice objects.
+ """
+ return [ad.device_info for ad in ads]
def _start_services_on_ads(ads):
- """Starts long running services on multiple AndroidDevice objects.
+ """Starts long running services on multiple AndroidDevice objects.
- If any one AndroidDevice object fails to start services, cleans up all
- AndroidDevice objects and their services.
+ If any one AndroidDevice object fails to start services, cleans up all
+ AndroidDevice objects and their services.
- Args:
- ads: A list of AndroidDevice objects whose services to start.
- """
- for ad in ads:
- start_logcat = not getattr(ad, KEY_SKIP_LOGCAT,
- DEFAULT_VALUE_SKIP_LOGCAT)
- try:
- if start_logcat:
- ad.services.logcat.start()
- except Exception:
- is_required = getattr(ad, KEY_DEVICE_REQUIRED,
- DEFAULT_VALUE_DEVICE_REQUIRED)
- if is_required:
- ad.log.exception('Failed to start some services, abort!')
- destroy(ads)
- raise
- else:
- ad.log.exception('Skipping this optional device because some '
- 'services failed to start.')
+ Args:
+ ads: A list of AndroidDevice objects whose services to start.
+ """
+ for ad in ads:
+ start_logcat = not getattr(ad, KEY_SKIP_LOGCAT,
+ DEFAULT_VALUE_SKIP_LOGCAT)
+ try:
+ if start_logcat:
+ ad.services.logcat.start()
+ except Exception:
+ is_required = getattr(ad, KEY_DEVICE_REQUIRED,
+ DEFAULT_VALUE_DEVICE_REQUIRED)
+ if is_required:
+ ad.log.exception('Failed to start some services, abort!')
+ destroy(ads)
+ raise
+ else:
+ ad.log.exception('Skipping this optional device because some '
+ 'services failed to start.')
def parse_device_list(device_list_str, key):
- """Parses a byte string representing a list of devices.
+ """Parses a byte string representing a list of devices.
- The string is generated by calling either adb or fastboot. The tokens in
- each string is tab-separated.
+ The string is generated by calling either adb or fastboot. The tokens in
+ each string is tab-separated.
- Args:
- device_list_str: Output of adb or fastboot.
- key: The token that signifies a device in device_list_str.
+ Args:
+ device_list_str: Output of adb or fastboot.
+ key: The token that signifies a device in device_list_str.
- Returns:
- A list of android device serial numbers.
- """
- try:
- clean_lines = str(device_list_str, 'utf-8').strip().split('\n')
- except UnicodeDecodeError:
- logging.warning("unicode decode error, origin str: %s", device_list_str)
- raise
- results = []
- for line in clean_lines:
- tokens = line.strip().split('\t')
- if len(tokens) == 2 and tokens[1] == key:
- results.append(tokens[0])
- return results
+ Returns:
+ A list of android device serial numbers.
+ """
+ try:
+ clean_lines = str(device_list_str, 'utf-8').strip().split('\n')
+ except UnicodeDecodeError:
+ logging.warning("unicode decode error, origin str: %s", device_list_str)
+ raise
+ results = []
+ for line in clean_lines:
+ tokens = line.strip().split('\t')
+ if len(tokens) == 2 and tokens[1] == key:
+ results.append(tokens[0])
+ return results
def list_adb_devices():
- """List all android devices connected to the computer that are detected by
- adb.
+ """List all android devices connected to the computer that are detected by
+ adb.
- Returns:
- A list of android device serials. Empty if there's none.
- """
- out = adb.AdbProxy().devices()
- return parse_device_list(out, 'device')
+ Returns:
+ A list of android device serials. Empty if there's none.
+ """
+ out = adb.AdbProxy().devices()
+ return parse_device_list(out, 'device')
def list_adb_devices_by_usb_id():
- """List the usb id of all android devices connected to the computer that
- are detected by adb.
+ """List the usb id of all android devices connected to the computer that
+ are detected by adb.
- Returns:
- A list of strings that are android device usb ids. Empty if there's
- none.
- """
- out = adb.AdbProxy().devices(['-l'])
- clean_lines = str(out, 'utf-8').strip().split('\n')
- results = []
- for line in clean_lines:
- tokens = line.strip().split()
- if len(tokens) > 2 and tokens[1] == 'device':
- results.append(tokens[2])
- return results
+ Returns:
+ A list of strings that are android device usb ids. Empty if there's
+ none.
+ """
+ out = adb.AdbProxy().devices(['-l'])
+ clean_lines = str(out, 'utf-8').strip().split('\n')
+ results = []
+ for line in clean_lines:
+ tokens = line.strip().split()
+ if len(tokens) > 2 and tokens[1] == 'device':
+ results.append(tokens[2])
+ return results
def list_fastboot_devices():
- """List all android devices connected to the computer that are in in
- fastboot mode. These are detected by fastboot.
+ """List all android devices connected to the computer that are in in
+ fastboot mode. These are detected by fastboot.
- Returns:
- A list of android device serials. Empty if there's none.
- """
- out = fastboot.FastbootProxy().devices()
- return parse_device_list(out, 'fastboot')
+ Returns:
+ A list of android device serials. Empty if there's none.
+ """
+ out = fastboot.FastbootProxy().devices()
+ return parse_device_list(out, 'fastboot')
def get_instances(serials):
- """Create AndroidDevice instances from a list of serials.
+ """Create AndroidDevice instances from a list of serials.
- Args:
- serials: A list of android device serials.
+ Args:
+ serials: A list of android device serials.
- Returns:
- A list of AndroidDevice objects.
- """
- results = []
- for s in serials:
- results.append(AndroidDevice(s))
- return results
+ Returns:
+ A list of AndroidDevice objects.
+ """
+ results = []
+ for s in serials:
+ results.append(AndroidDevice(s))
+ return results
def get_instances_with_configs(configs):
- """Create AndroidDevice instances from a list of dict configs.
+ """Create AndroidDevice instances from a list of dict configs.
- Each config should have the required key-value pair 'serial'.
+ Each config should have the required key-value pair 'serial'.
- Args:
- configs: A list of dicts each representing the configuration of one
- android device.
+ Args:
+ configs: A list of dicts each representing the configuration of one
+ android device.
- Returns:
- A list of AndroidDevice objects.
- """
- results = []
- for c in configs:
- try:
- serial = c.pop('serial')
- except KeyError:
- raise Error(
- 'Required value "serial" is missing in AndroidDevice config %s.'
- % c)
- is_required = c.get(KEY_DEVICE_REQUIRED, True)
- try:
- ad = AndroidDevice(serial)
- ad.load_config(c)
- except Exception:
- if is_required:
- raise
- ad.log.exception('Skipping this optional device due to error.')
- continue
- results.append(ad)
- return results
+ Returns:
+ A list of AndroidDevice objects.
+ """
+ results = []
+ for c in configs:
+ try:
+ serial = c.pop('serial')
+ except KeyError:
+ raise Error(
+ 'Required value "serial" is missing in AndroidDevice config %s.'
+ % c)
+ is_required = c.get(KEY_DEVICE_REQUIRED, True)
+ try:
+ ad = AndroidDevice(serial)
+ ad.load_config(c)
+ except Exception:
+ if is_required:
+ raise
+ ad.log.exception('Skipping this optional device due to error.')
+ continue
+ results.append(ad)
+ return results
def get_all_instances(include_fastboot=False):
- """Create AndroidDevice instances for all attached android devices.
+ """Create AndroidDevice instances for all attached android devices.
- Args:
- include_fastboot: Whether to include devices in bootloader mode or not.
+ Args:
+ include_fastboot: Whether to include devices in bootloader mode or not.
- Returns:
- A list of AndroidDevice objects each representing an android device
- attached to the computer.
- """
- if include_fastboot:
- serial_list = list_adb_devices() + list_fastboot_devices()
- return get_instances(serial_list)
- return get_instances(list_adb_devices())
+ Returns:
+ A list of AndroidDevice objects each representing an android device
+ attached to the computer.
+ """
+ if include_fastboot:
+ serial_list = list_adb_devices() + list_fastboot_devices()
+ return get_instances(serial_list)
+ return get_instances(list_adb_devices())
def filter_devices(ads, func):
- """Finds the AndroidDevice instances from a list that match certain
- conditions.
+ """Finds the AndroidDevice instances from a list that match certain
+ conditions.
- Args:
- ads: A list of AndroidDevice instances.
- func: A function that takes an AndroidDevice object and returns True
- if the device satisfies the filter condition.
+ Args:
+ ads: A list of AndroidDevice instances.
+ func: A function that takes an AndroidDevice object and returns True
+ if the device satisfies the filter condition.
- Returns:
- A list of AndroidDevice instances that satisfy the filter condition.
- """
- results = []
- for ad in ads:
- if func(ad):
- results.append(ad)
- return results
+ Returns:
+ A list of AndroidDevice instances that satisfy the filter condition.
+ """
+ results = []
+ for ad in ads:
+ if func(ad):
+ results.append(ad)
+ return results
def get_devices(ads, **kwargs):
- """Finds a list of AndroidDevice instance from a list that has specific
- attributes of certain values.
+ """Finds a list of AndroidDevice instance from a list that has specific
+ attributes of certain values.
- Example:
- get_devices(android_devices, label='foo', phone_number='1234567890')
- get_devices(android_devices, model='angler')
+ Example:
+ get_devices(android_devices, label='foo', phone_number='1234567890')
+ get_devices(android_devices, model='angler')
- Args:
- ads: A list of AndroidDevice instances.
- kwargs: keyword arguments used to filter AndroidDevice instances.
+ Args:
+ ads: A list of AndroidDevice instances.
+ kwargs: keyword arguments used to filter AndroidDevice instances.
- Returns:
- A list of target AndroidDevice instances.
+ Returns:
+ A list of target AndroidDevice instances.
- Raises:
- Error: No devices are matched.
- """
+ Raises:
+ Error: No devices are matched.
+ """
- def _get_device_filter(ad):
- for k, v in kwargs.items():
- if not hasattr(ad, k):
- return False
- elif getattr(ad, k) != v:
- return False
- return True
+ def _get_device_filter(ad):
+ for k, v in kwargs.items():
+ if not hasattr(ad, k):
+ return False
+ elif getattr(ad, k) != v:
+ return False
+ return True
- filtered = filter_devices(ads, _get_device_filter)
- if not filtered:
- raise Error(
- 'Could not find a target device that matches condition: %s.' %
- kwargs)
- else:
- return filtered
+ filtered = filter_devices(ads, _get_device_filter)
+ if not filtered:
+ raise Error(
+ 'Could not find a target device that matches condition: %s.' %
+ kwargs)
+ else:
+ return filtered
def get_device(ads, **kwargs):
- """Finds a unique AndroidDevice instance from a list that has specific
- attributes of certain values.
+ """Finds a unique AndroidDevice instance from a list that has specific
+ attributes of certain values.
- Example:
- get_device(android_devices, label='foo', phone_number='1234567890')
- get_device(android_devices, model='angler')
+ Example:
+ get_device(android_devices, label='foo', phone_number='1234567890')
+ get_device(android_devices, model='angler')
- Args:
- ads: A list of AndroidDevice instances.
- kwargs: keyword arguments used to filter AndroidDevice instances.
+ Args:
+ ads: A list of AndroidDevice instances.
+ kwargs: keyword arguments used to filter AndroidDevice instances.
- Returns:
- The target AndroidDevice instance.
+ Returns:
+ The target AndroidDevice instance.
- Raises:
- Error: None or more than one device is matched.
- """
+ Raises:
+ Error: None or more than one device is matched.
+ """
- filtered = get_devices(ads, **kwargs)
- if len(filtered) == 1:
- return filtered[0]
- else:
- serials = [ad.serial for ad in filtered]
- raise Error('More than one device matched: %s' % serials)
+ filtered = get_devices(ads, **kwargs)
+ if len(filtered) == 1:
+ return filtered[0]
+ else:
+ serials = [ad.serial for ad in filtered]
+ raise Error('More than one device matched: %s' % serials)
def take_bug_reports(ads, test_name=None, begin_time=None, destination=None):
- """Takes bug reports on a list of android devices.
+ """Takes bug reports on a list of android devices.
- If you want to take a bug report, call this function with a list of
- android_device objects in on_fail. But reports will be taken on all the
- devices in the list concurrently. Bug report takes a relative long
- time to take, so use this cautiously.
+ If you want to take a bug report, call this function with a list of
+ android_device objects in on_fail. But reports will be taken on all the
+ devices in the list concurrently. Bug report takes a relative long
+ time to take, so use this cautiously.
- Args:
- ads: A list of AndroidDevice instances.
- test_name: Name of the test method that triggered this bug report.
- If None, the default name "bugreport" will be used.
- begin_time: timestamp taken when the test started, can be either
- string or int. If None, the current time will be used.
- destination: string, path to the directory where the bugreport
- should be saved.
- """
- if begin_time is None:
- begin_time = mobly_logger.get_log_file_timestamp()
- else:
- begin_time = mobly_logger.sanitize_filename(str(begin_time))
+ Args:
+ ads: A list of AndroidDevice instances.
+ test_name: Name of the test method that triggered this bug report.
+ If None, the default name "bugreport" will be used.
+ begin_time: timestamp taken when the test started, can be either
+ string or int. If None, the current time will be used.
+ destination: string, path to the directory where the bugreport
+ should be saved.
+ """
+ if begin_time is None:
+ begin_time = mobly_logger.get_log_file_timestamp()
+ else:
+ begin_time = mobly_logger.sanitize_filename(str(begin_time))
- def take_br(test_name, begin_time, ad, destination):
- ad.take_bug_report(test_name=test_name,
- begin_time=begin_time,
- destination=destination)
+ def take_br(test_name, begin_time, ad, destination):
+ ad.take_bug_report(test_name=test_name,
+ begin_time=begin_time,
+ destination=destination)
- args = [(test_name, begin_time, ad, destination) for ad in ads]
- utils.concurrent_exec(take_br, args)
+ args = [(test_name, begin_time, ad, destination) for ad in ads]
+ utils.concurrent_exec(take_br, args)
class AndroidDevice(object):
- """Class representing an android device.
+ """Class representing an android device.
- Each object of this class represents one Android device in Mobly. This class
- provides various ways, like adb, fastboot, and Mobly snippets, to control
- an Android device, whether it's a real device or an emulator instance.
+ Each object of this class represents one Android device in Mobly. This class
+ provides various ways, like adb, fastboot, and Mobly snippets, to control
+ an Android device, whether it's a real device or an emulator instance.
- You can also register your own services to the device's service manager.
- See the docs of `service_manager` and `base_service` for details.
+ You can also register your own services to the device's service manager.
+ See the docs of `service_manager` and `base_service` for details.
- Attributes:
- serial: A string that's the serial number of the Androi device.
- log_path: A string that is the path where all logs collected on this
- android device should be stored.
- log: A logger adapted from root logger with an added prefix specific
- to an AndroidDevice instance. The default prefix is
- [AndroidDevice|<serial>]. Use self.debug_tag = 'tag' to use a
- different tag in the prefix.
- adb_logcat_file_path: A string that's the full path to the adb logcat
- file collected, if any.
- adb: An AdbProxy object used for interacting with the device via adb.
- fastboot: A FastbootProxy object used for interacting with the device
- via fastboot.
- services: ServiceManager, the manager of long-running services on the
- device.
+ Attributes:
+ serial: A string that's the serial number of the Androi device.
+ log_path: A string that is the path where all logs collected on this
+ android device should be stored.
+ log: A logger adapted from root logger with an added prefix specific
+ to an AndroidDevice instance. The default prefix is
+ [AndroidDevice|<serial>]. Use self.debug_tag = 'tag' to use a
+ different tag in the prefix.
+ adb_logcat_file_path: A string that's the full path to the adb logcat
+ file collected, if any.
+ adb: An AdbProxy object used for interacting with the device via adb.
+ fastboot: A FastbootProxy object used for interacting with the device
+ via fastboot.
+ services: ServiceManager, the manager of long-running services on the
+ device.
+ """
+
+ def __init__(self, serial=''):
+ self._serial = str(serial)
+ # logging.log_path only exists when this is used in an Mobly test run.
+ self._log_path_base = getattr(logging, 'log_path', '/tmp/logs')
+ self._log_path = os.path.join(
+ self._log_path_base, 'AndroidDevice%s' % self._normalized_serial)
+ self._debug_tag = self._serial
+ self.log = AndroidDeviceLoggerAdapter(logging.getLogger(),
+ {'tag': self.debug_tag})
+ self._build_info = None
+ self._is_rebooting = False
+ self.adb = adb.AdbProxy(serial)
+ self.fastboot = fastboot.FastbootProxy(serial)
+ if self.is_rootable:
+ self.root_adb()
+ self.services = service_manager.ServiceManager(self)
+ self.services.register(SERVICE_NAME_LOGCAT,
+ logcat.Logcat,
+ start_service=False)
+ self.services.register(
+ 'snippets', snippet_management_service.SnippetManagementService)
+ # Device info cache.
+ self._user_added_device_info = {}
+
+ def __repr__(self):
+ return '<AndroidDevice|%s>' % self.debug_tag
+
+ @property
+ def adb_logcat_file_path(self):
+ if self.services.has_service_by_name(SERVICE_NAME_LOGCAT):
+ return self.services.logcat.adb_logcat_file_path
+
+ @property
+ def _normalized_serial(self):
+ """Normalized serial name for usage in log filename.
+
+ Some Android emulators use ip:port as their serial names, while on
+ Windows `:` is not valid in filename, it should be sanitized first.
"""
+ if self._serial is None:
+ return None
+ return mobly_logger.sanitize_filename(self._serial)
- def __init__(self, serial=''):
- self._serial = str(serial)
- # logging.log_path only exists when this is used in an Mobly test run.
- self._log_path_base = getattr(logging, 'log_path', '/tmp/logs')
- self._log_path = os.path.join(
- self._log_path_base, 'AndroidDevice%s' % self._normalized_serial)
- self._debug_tag = self._serial
- self.log = AndroidDeviceLoggerAdapter(logging.getLogger(),
- {'tag': self.debug_tag})
- self._build_info = None
- self._is_rebooting = False
- self.adb = adb.AdbProxy(serial)
- self.fastboot = fastboot.FastbootProxy(serial)
- if self.is_rootable:
- self.root_adb()
- self.services = service_manager.ServiceManager(self)
- self.services.register(SERVICE_NAME_LOGCAT,
- logcat.Logcat,
- start_service=False)
- self.services.register(
- 'snippets', snippet_management_service.SnippetManagementService)
- # Device info cache.
- self._user_added_device_info = {}
+ @property
+ def device_info(self):
+ """Information to be pulled into controller info.
- def __repr__(self):
- return '<AndroidDevice|%s>' % self.debug_tag
+ The latest serial, model, and build_info are included. Additional info
+ can be added via `add_device_info`.
+ """
+ info = {
+ 'serial': self.serial,
+ 'model': self.model,
+ 'build_info': self.build_info,
+ 'user_added_info': self._user_added_device_info
+ }
+ return info
- @property
- def adb_logcat_file_path(self):
- if self.services.has_service_by_name(SERVICE_NAME_LOGCAT):
- return self.services.logcat.adb_logcat_file_path
+ def add_device_info(self, name, info):
+ """Add information of the device to be pulled into controller info.
- @property
- def _normalized_serial(self):
- """Normalized serial name for usage in log filename.
+ Adding the same info name the second time will override existing info.
- Some Android emulators use ip:port as their serial names, while on
- Windows `:` is not valid in filename, it should be sanitized first.
- """
- if self._serial is None:
- return None
- return mobly_logger.sanitize_filename(self._serial)
+ Args:
+ name: string, name of this info.
+ info: serializable, content of the info.
+ """
+ self._user_added_device_info.update({name: info})
- @property
- def device_info(self):
- """Information to be pulled into controller info.
+ @property
+ def sl4a(self):
+ """Attribute for direct access of sl4a client.
- The latest serial, model, and build_info are included. Additional info
- can be added via `add_device_info`.
- """
- info = {
- 'serial': self.serial,
- 'model': self.model,
- 'build_info': self.build_info,
- 'user_added_info': self._user_added_device_info
- }
- return info
+ Not recommended. This is here for backward compatibility reasons.
- def add_device_info(self, name, info):
- """Add information of the device to be pulled into controller info.
+ Preferred: directly access `ad.services.sl4a`.
+ """
+ if self.services.has_service_by_name('sl4a'):
+ return self.services.sl4a
- Adding the same info name the second time will override existing info.
+ @property
+ def ed(self):
+ """Attribute for direct access of sl4a's event dispatcher.
- Args:
- name: string, name of this info.
- info: serializable, content of the info.
- """
- self._user_added_device_info.update({name: info})
+ Not recommended. This is here for backward compatibility reasons.
- @property
- def sl4a(self):
- """Attribute for direct access of sl4a client.
+ Preferred: directly access `ad.services.sl4a.ed`.
+ """
+ if self.services.has_service_by_name('sl4a'):
+ return self.services.sl4a.ed
- Not recommended. This is here for backward compatibility reasons.
+ @property
+ def debug_tag(self):
+ """A string that represents a device object in debug info. Default value
+ is the device serial.
- Preferred: directly access `ad.services.sl4a`.
- """
- if self.services.has_service_by_name('sl4a'):
- return self.services.sl4a
+ This will be used as part of the prefix of debugging messages emitted by
+ this device object, like log lines and the message of DeviceError.
+ """
+ return self._debug_tag
- @property
- def ed(self):
- """Attribute for direct access of sl4a's event dispatcher.
+ @debug_tag.setter
+ def debug_tag(self, tag):
+ """Setter for the debug tag.
- Not recommended. This is here for backward compatibility reasons.
+ By default, the tag is the serial of the device, but sometimes it may
+ be more descriptive to use a different tag of the user's choice.
- Preferred: directly access `ad.services.sl4a.ed`.
- """
- if self.services.has_service_by_name('sl4a'):
- return self.services.sl4a.ed
+ Changing debug tag changes part of the prefix of debug info emitted by
+ this object, like log lines and the message of DeviceError.
- @property
- def debug_tag(self):
- """A string that represents a device object in debug info. Default value
- is the device serial.
+ Example:
+ By default, the device's serial number is used:
+ 'INFO [AndroidDevice|abcdefg12345] One pending call ringing.'
+ The tag can be customized with `ad.debug_tag = 'Caller'`:
+ 'INFO [AndroidDevice|Caller] One pending call ringing.'
+ """
+ self.log.info('Logging debug tag set to "%s"', tag)
+ self._debug_tag = tag
+ self.log.extra['tag'] = tag
- This will be used as part of the prefix of debugging messages emitted by
- this device object, like log lines and the message of DeviceError.
- """
- return self._debug_tag
+ @property
+ def has_active_service(self):
+ """True if any service is running on the device.
- @debug_tag.setter
- def debug_tag(self, tag):
- """Setter for the debug tag.
+ A service can be a snippet or logcat collection.
+ """
+ return self.services.is_any_alive
- By default, the tag is the serial of the device, but sometimes it may
- be more descriptive to use a different tag of the user's choice.
+ @property
+ def log_path(self):
+ """A string that is the path for all logs collected from this device.
+ """
+ return self._log_path
- Changing debug tag changes part of the prefix of debug info emitted by
- this object, like log lines and the message of DeviceError.
+ @log_path.setter
+ def log_path(self, new_path):
+ """Setter for `log_path`, use with caution."""
+ if self.has_active_service:
+ raise DeviceError(
+ self,
+ 'Cannot change `log_path` when there is service running.')
+ old_path = self._log_path
+ if new_path == old_path:
+ return
+ if os.listdir(new_path):
+ raise DeviceError(
+ self, 'Logs already exist at %s, cannot override.' % new_path)
+ if os.path.exists(old_path):
+ # Remove new path so copytree doesn't complain.
+ shutil.rmtree(new_path, ignore_errors=True)
+ shutil.copytree(old_path, new_path)
+ shutil.rmtree(old_path, ignore_errors=True)
+ self._log_path = new_path
- Example:
- By default, the device's serial number is used:
- 'INFO [AndroidDevice|abcdefg12345] One pending call ringing.'
- The tag can be customized with `ad.debug_tag = 'Caller'`:
- 'INFO [AndroidDevice|Caller] One pending call ringing.'
- """
- self.log.info('Logging debug tag set to "%s"', tag)
- self._debug_tag = tag
- self.log.extra['tag'] = tag
+ @property
+ def serial(self):
+ """The serial number used to identify a device.
- @property
- def has_active_service(self):
- """True if any service is running on the device.
+ This is essentially the value used for adb's `-s` arg, which means it
+ can be a network address or USB bus number.
+ """
+ return self._serial
- A service can be a snippet or logcat collection.
- """
- return self.services.is_any_alive
+ def update_serial(self, new_serial):
+ """Updates the serial number of a device.
- @property
- def log_path(self):
- """A string that is the path for all logs collected from this device.
- """
- return self._log_path
+ The "serial number" used with adb's `-s` arg is not necessarily the
+ actual serial number. For remote devices, it could be a combination of
+ host names and port numbers.
- @log_path.setter
- def log_path(self, new_path):
- """Setter for `log_path`, use with caution."""
- if self.has_active_service:
- raise DeviceError(
- self,
- 'Cannot change `log_path` when there is service running.')
- old_path = self._log_path
- if new_path == old_path:
- return
- if os.listdir(new_path):
- raise DeviceError(
- self, 'Logs already exist at %s, cannot override.' % new_path)
- if os.path.exists(old_path):
- # Remove new path so copytree doesn't complain.
- shutil.rmtree(new_path, ignore_errors=True)
- shutil.copytree(old_path, new_path)
- shutil.rmtree(old_path, ignore_errors=True)
- self._log_path = new_path
+ This is used for when such identifier of remote devices changes during
+ a test. For example, when a remote device reboots, it may come back
+ with a different serial number.
- @property
- def serial(self):
- """The serial number used to identify a device.
+ This is NOT meant for switching the object to represent another device.
- This is essentially the value used for adb's `-s` arg, which means it
- can be a network address or USB bus number.
- """
- return self._serial
+ We intentionally did not make it a regular setter of the serial
+ property so people don't accidentally call this without understanding
+ the consequences.
- def update_serial(self, new_serial):
- """Updates the serial number of a device.
+ Args:
+ new_serial: string, the new serial number for the same device.
- The "serial number" used with adb's `-s` arg is not necessarily the
- actual serial number. For remote devices, it could be a combination of
- host names and port numbers.
+ Raises:
+ DeviceError: tries to update serial when any service is running.
+ """
+ new_serial = str(new_serial)
+ if self.has_active_service:
+ raise DeviceError(
+ self,
+ 'Cannot change device serial number when there is service running.'
+ )
+ if self._debug_tag == self.serial:
+ self._debug_tag = new_serial
+ self._serial = new_serial
+ self.adb.serial = new_serial
+ self.fastboot.serial = new_serial
- This is used for when such identifier of remote devices changes during
- a test. For example, when a remote device reboots, it may come back
- with a different serial number.
+ @contextlib.contextmanager
+ def handle_reboot(self):
+ """Properly manage the service life cycle when the device needs to
+ temporarily disconnect.
- This is NOT meant for switching the object to represent another device.
+ The device can temporarily lose adb connection due to user-triggered
+ reboot. Use this function to make sure the services
+ started by Mobly are properly stopped and restored afterwards.
- We intentionally did not make it a regular setter of the serial
- property so people don't accidentally call this without understanding
- the consequences.
+ For sample usage, see self.reboot().
+ """
+ live_service_names = self.services.list_live_services()
+ self.services.stop_all()
+ # On rooted devices, system properties may change on reboot, so disable
+ # the `build_info` cache by setting `_is_rebooting` to True and
+ # repopulate it after reboot.
+ # Note, this logic assumes that instance variable assignment in Python
+ # is atomic; otherwise, `threading` data structures would be necessary.
+ # Additionally, nesting calls to `handle_reboot` while changing the
+ # read-only property values during reboot will result in stale values.
+ self._is_rebooting = True
+ try:
+ yield
+ finally:
+ self.wait_for_boot_completion()
+ # On boot completion, invalidate the `build_info` cache since any
+ # value it had from before boot completion is potentially invalid.
+ # If the value gets set after the final invalidation and before
+ # setting`_is_rebooting` to True, then that's okay because the
+ # device has finished rebooting at that point, and values at that
+ # point should be valid.
+ # If the reboot fails for some reason, then `_is_rebooting` is never
+ # set to False, which means the `build_info` cache remains disabled
+ # until the next reboot. This is relatively okay because the
+ # `build_info` cache is only minimizes adb commands.
+ self._build_info = None
+ self._is_rebooting = False
+ if self.is_rootable:
+ self.root_adb()
+ self.services.start_services(live_service_names)
- Args:
- new_serial: string, the new serial number for the same device.
+ @contextlib.contextmanager
+ def handle_usb_disconnect(self):
+ """Properly manage the service life cycle when USB is disconnected.
- Raises:
- DeviceError: tries to update serial when any service is running.
- """
- new_serial = str(new_serial)
- if self.has_active_service:
- raise DeviceError(
- self,
- 'Cannot change device serial number when there is service running.'
- )
- if self._debug_tag == self.serial:
- self._debug_tag = new_serial
- self._serial = new_serial
- self.adb.serial = new_serial
- self.fastboot.serial = new_serial
+ The device can temporarily lose adb connection due to user-triggered
+ USB disconnection, e.g. the following cases can be handled by this
+ method:
- @contextlib.contextmanager
- def handle_reboot(self):
- """Properly manage the service life cycle when the device needs to
- temporarily disconnect.
+ * Power measurement: Using Monsoon device to measure battery consumption
+ would potentially disconnect USB.
+ * Unplug USB so device loses connection.
+ * ADB connection over WiFi and WiFi got disconnected.
+ * Any other type of USB disconnection, as long as snippet session can
+ be kept alive while USB disconnected (reboot caused USB
+ disconnection is not one of these cases because snippet session
+ cannot survive reboot.
+ Use handle_reboot() instead).
- The device can temporarily lose adb connection due to user-triggered
- reboot. Use this function to make sure the services
- started by Mobly are properly stopped and restored afterwards.
+ Use this function to make sure the services started by Mobly are
+ properly reconnected afterwards.
- For sample usage, see self.reboot().
- """
- live_service_names = self.services.list_live_services()
- self.services.stop_all()
- # On rooted devices, system properties may change on reboot, so disable
- # the `build_info` cache by setting `_is_rebooting` to True and
- # repopulate it after reboot.
- # Note, this logic assumes that instance variable assignment in Python
- # is atomic; otherwise, `threading` data structures would be necessary.
- # Additionally, nesting calls to `handle_reboot` while changing the
- # read-only property values during reboot will result in stale values.
- self._is_rebooting = True
+ Just like the usage of self.handle_reboot(), this method does not
+ automatically detect if the disconnection is because of a reboot or USB
+ disconnect. Users of this function should make sure the right handle_*
+ function is used to handle the correct type of disconnection.
+
+ This method also reconnects snippet event client. Therefore, the
+ callback objects created (by calling Async RPC methods) before
+ disconnection would still be valid and can be used to retrieve RPC
+ execution result after device got reconnected.
+
+ Example Usage:
+
+ .. code-block:: python
+
+ with ad.handle_usb_disconnect():
try:
- yield
+ # User action that triggers USB disconnect, could throw
+ # exceptions.
+ do_something()
finally:
- self.wait_for_boot_completion()
- # On boot completion, invalidate the `build_info` cache since any
- # value it had from before boot completion is potentially invalid.
- # If the value gets set after the final invalidation and before
- # setting`_is_rebooting` to True, then that's okay because the
- # device has finished rebooting at that point, and values at that
- # point should be valid.
- # If the reboot fails for some reason, then `_is_rebooting` is never
- # set to False, which means the `build_info` cache remains disabled
- # until the next reboot. This is relatively okay because the
- # `build_info` cache is only minimizes adb commands.
- self._build_info = None
- self._is_rebooting = False
- if self.is_rootable:
- self.root_adb()
- self.services.start_services(live_service_names)
+ # User action that triggers USB reconnect
+ action_that_reconnects_usb()
+ # Make sure device is reconnected before returning from this
+ # context
+ ad.adb.wait_for_device(timeout=SOME_TIMEOUT)
+ """
+ live_service_names = self.services.list_live_services()
+ self.services.pause_all()
+ try:
+ yield
+ finally:
+ self.services.resume_services(live_service_names)
- @contextlib.contextmanager
- def handle_usb_disconnect(self):
- """Properly manage the service life cycle when USB is disconnected.
+ @property
+ def build_info(self):
+ """Get the build info of this Android device, including build id and
+ build type.
- The device can temporarily lose adb connection due to user-triggered
- USB disconnection, e.g. the following cases can be handled by this
- method:
+ This is not available if the device is in bootloader mode.
- * Power measurement: Using Monsoon device to measure battery consumption
- would potentially disconnect USB.
- * Unplug USB so device loses connection.
- * ADB connection over WiFi and WiFi got disconnected.
- * Any other type of USB disconnection, as long as snippet session can
- be kept alive while USB disconnected (reboot caused USB
- disconnection is not one of these cases because snippet session
- cannot survive reboot.
- Use handle_reboot() instead).
+ Returns:
+ A dict with the build info of this Android device, or None if the
+ device is in bootloader mode.
+ """
+ if self.is_bootloader:
+ self.log.error('Device is in fastboot mode, could not get build '
+ 'info.')
+ return
+ if self._build_info is None or self._is_rebooting:
+ info = {}
+ build_info = self.adb.getprops(CACHED_SYSTEM_PROPS)
+ info['build_id'] = build_info['ro.build.id']
+ info['build_type'] = build_info['ro.build.type']
+ info['build_version_codename'] = build_info.get(
+ 'ro.build.version.codename', '')
+ info['build_version_sdk'] = build_info.get('ro.build.version.sdk',
+ '')
+ info['build_product'] = build_info.get('ro.build.product', '')
+ info['build_characteristics'] = build_info.get(
+ 'ro.build.characteristics', '')
+ info['debuggable'] = build_info.get('ro.debuggable', '')
+ info['product_name'] = build_info.get('ro.product.name', '')
+ info['hardware'] = build_info.get('ro.hardware', '')
+ self._build_info = info
+ return info
+ return self._build_info
- Use this function to make sure the services started by Mobly are
- properly reconnected afterwards.
+ @property
+ def is_bootloader(self):
+ """True if the device is in bootloader mode.
+ """
+ return self.serial in list_fastboot_devices()
- Just like the usage of self.handle_reboot(), this method does not
- automatically detect if the disconnection is because of a reboot or USB
- disconnect. Users of this function should make sure the right handle_*
- function is used to handle the correct type of disconnection.
+ @property
+ def is_adb_root(self):
+ """True if adb is running as root for this device.
+ """
+ try:
+ return '0' == self.adb.shell('id -u').decode('utf-8').strip()
+ except adb.AdbError:
+ # Wait a bit and retry to work around adb flakiness for this cmd.
+ time.sleep(0.2)
+ return '0' == self.adb.shell('id -u').decode('utf-8').strip()
- This method also reconnects snippet event client. Therefore, the
- callback objects created (by calling Async RPC methods) before
- disconnection would still be valid and can be used to retrieve RPC
- execution result after device got reconnected.
+ @property
+ def is_rootable(self):
+ return not self.is_bootloader and self.build_info['debuggable'] == '1'
- Example Usage:
+ @property
+ def model(self):
+ """The Android code name for the device.
+ """
+ # If device is in bootloader mode, get mode name from fastboot.
+ if self.is_bootloader:
+ out = self.fastboot.getvar('product').strip()
+ # 'out' is never empty because of the 'total time' message fastboot
+ # writes to stderr.
+ lines = out.decode('utf-8').split('\n', 1)
+ if lines:
+ tokens = lines[0].split(' ')
+ if len(tokens) > 1:
+ return tokens[1].lower()
+ return None
+ model = self.build_info['build_product'].lower()
+ if model == 'sprout':
+ return model
+ return self.build_info['product_name'].lower()
- .. code-block:: python
+ @property
+ def is_emulator(self):
+ """Whether this device is probably an emulator.
- with ad.handle_usb_disconnect():
- try:
- # User action that triggers USB disconnect, could throw
- # exceptions.
- do_something()
- finally:
- # User action that triggers USB reconnect
- action_that_reconnects_usb()
- # Make sure device is reconnected before returning from this
- # context
- ad.adb.wait_for_device(timeout=SOME_TIMEOUT)
- """
- live_service_names = self.services.list_live_services()
- self.services.pause_all()
- try:
- yield
- finally:
- self.services.resume_services(live_service_names)
+ Returns:
+ True if this is probably an emulator.
+ """
+ if EMULATOR_SERIAL_REGEX.match(self.serial):
+ # If the device's serial follows 'emulator-dddd', then it's almost
+ # certainly an emulator.
+ return True
+ elif self.build_info['build_characteristics'] == 'emulator':
+ # If the device says that it's an emulator, then it's probably an
+ # emulator although some real devices apparently report themselves
+ # as emulators in addition to other things, so only return True on
+ # an exact match.
+ return True
+ elif self.build_info['hardware'] in ['ranchu', 'goldfish']:
+ # Ranchu and Goldfish are the hardware properties that the AOSP
+ # emulators report, so if the device says it's an AOSP emulator, it
+ # probably is one.
+ return True
+ else:
+ return False
- @property
- def build_info(self):
- """Get the build info of this Android device, including build id and
- build type.
+ def load_config(self, config):
+ """Add attributes to the AndroidDevice object based on config.
- This is not available if the device is in bootloader mode.
+ Args:
+ config: A dictionary representing the configs.
- Returns:
- A dict with the build info of this Android device, or None if the
- device is in bootloader mode.
- """
- if self.is_bootloader:
- self.log.error('Device is in fastboot mode, could not get build '
- 'info.')
- return
- if self._build_info is None or self._is_rebooting:
- info = {}
- build_info = self.adb.getprops(CACHED_SYSTEM_PROPS)
- info['build_id'] = build_info['ro.build.id']
- info['build_type'] = build_info['ro.build.type']
- info['build_version_codename'] = build_info.get(
- 'ro.build.version.codename', '')
- info['build_version_sdk'] = build_info.get('ro.build.version.sdk',
- '')
- info['build_product'] = build_info.get('ro.build.product', '')
- info['build_characteristics'] = build_info.get(
- 'ro.build.characteristics', '')
- info['debuggable'] = build_info.get('ro.debuggable', '')
- info['product_name'] = build_info.get('ro.product.name', '')
- info['hardware'] = build_info.get('ro.hardware', '')
- self._build_info = info
- return info
- return self._build_info
+ Raises:
+ Error: The config is trying to overwrite an existing attribute.
+ """
+ for k, v in config.items():
+ if hasattr(self, k) and k not in _ANDROID_DEVICE_SETTABLE_PROPS:
+ raise DeviceError(
+ self,
+ ('Attribute %s already exists with value %s, cannot set '
+ 'again.') % (k, getattr(self, k)))
+ setattr(self, k, v)
- @property
- def is_bootloader(self):
- """True if the device is in bootloader mode.
- """
- return self.serial in list_fastboot_devices()
+ def root_adb(self):
+ """Change adb to root mode for this device if allowed.
- @property
- def is_adb_root(self):
- """True if adb is running as root for this device.
- """
- try:
- return '0' == self.adb.shell('id -u').decode('utf-8').strip()
- except adb.AdbError:
- # Wait a bit and retry to work around adb flakiness for this cmd.
- time.sleep(0.2)
- return '0' == self.adb.shell('id -u').decode('utf-8').strip()
+ If executed on a production build, adb will not be switched to root
+ mode per security restrictions.
+ """
+ self.adb.root()
+ # `root` causes the device to temporarily disappear from adb.
+ # So we need to wait for the device to come back before proceeding.
+ self.adb.wait_for_device(
+ timeout=DEFAULT_TIMEOUT_BOOT_COMPLETION_SECOND)
- @property
- def is_rootable(self):
- return not self.is_bootloader and self.build_info['debuggable'] == '1'
+ def load_snippet(self, name, package):
+ """Starts the snippet apk with the given package name and connects.
- @property
- def model(self):
- """The Android code name for the device.
- """
- # If device is in bootloader mode, get mode name from fastboot.
- if self.is_bootloader:
- out = self.fastboot.getvar('product').strip()
- # 'out' is never empty because of the 'total time' message fastboot
- # writes to stderr.
- lines = out.decode('utf-8').split('\n', 1)
- if lines:
- tokens = lines[0].split(' ')
- if len(tokens) > 1:
- return tokens[1].lower()
- return None
- model = self.build_info['build_product'].lower()
- if model == 'sprout':
- return model
- return self.build_info['product_name'].lower()
+ Examples:
- @property
- def is_emulator(self):
- """Whether this device is probably an emulator.
+ .. code-block:: python
- Returns:
- True if this is probably an emulator.
- """
- if EMULATOR_SERIAL_REGEX.match(self.serial):
- # If the device's serial follows 'emulator-dddd', then it's almost
- # certainly an emulator.
- return True
- elif self.build_info['build_characteristics'] == 'emulator':
- # If the device says that it's an emulator, then it's probably an
- # emulator although some real devices apparently report themselves
- # as emulators in addition to other things, so only return True on
- # an exact match.
- return True
- elif self.build_info['hardware'] in ['ranchu', 'goldfish']:
- # Ranchu and Goldfish are the hardware properties that the AOSP
- # emulators report, so if the device says it's an AOSP emulator, it
- # probably is one.
- return True
- else:
- return False
+ ad.load_snippet(
+ name='maps', package='com.google.maps.snippets')
+ ad.maps.activateZoom('3')
- def load_config(self, config):
- """Add attributes to the AndroidDevice object based on config.
+ Args:
+ name: string, the attribute name to which to attach the snippet
+ client. E.g. `name='maps'` attaches the snippet client to
+ `ad.maps`.
+ package: string, the package name of the snippet apk to connect to.
- Args:
- config: A dictionary representing the configs.
+ Raises:
+ SnippetError: Illegal load operations are attempted.
+ """
+ # Should not load snippet with an existing attribute.
+ if hasattr(self, name):
+ raise SnippetError(
+ self,
+ 'Attribute "%s" already exists, please use a different name.' %
+ name)
+ self.services.snippets.add_snippet_client(name, package)
- Raises:
- Error: The config is trying to overwrite an existing attribute.
- """
- for k, v in config.items():
- if hasattr(self, k) and k not in _ANDROID_DEVICE_SETTABLE_PROPS:
- raise DeviceError(
- self,
- ('Attribute %s already exists with value %s, cannot set '
- 'again.') % (k, getattr(self, k)))
- setattr(self, k, v)
+ def unload_snippet(self, name):
+ """Stops a snippet apk.
- def root_adb(self):
- """Change adb to root mode for this device if allowed.
+ Args:
+ name: The attribute name the snippet server is attached with.
- If executed on a production build, adb will not be switched to root
- mode per security restrictions.
- """
- self.adb.root()
- # `root` causes the device to temporarily disappear from adb.
- # So we need to wait for the device to come back before proceeding.
- self.adb.wait_for_device(
- timeout=DEFAULT_TIMEOUT_BOOT_COMPLETION_SECOND)
+ Raises:
+ SnippetError: The given snippet name is not registered.
+ """
+ self.services.snippets.remove_snippet_client(name)
- def load_snippet(self, name, package):
- """Starts the snippet apk with the given package name and connects.
+ def generate_filename(self,
+ file_type,
+ time_identifier=None,
+ extension_name=None):
+ """Generates a name for an output file related to this device.
- Examples:
+ The name follows the pattern:
- .. code-block:: python
+ {file type},{debug_tag},{serial},{model},{time identifier}.{ext}
- ad.load_snippet(
- name='maps', package='com.google.maps.snippets')
- ad.maps.activateZoom('3')
+ "debug_tag" is only added if it's different from the serial. "ext" is
+ added if specified by user.
- Args:
- name: string, the attribute name to which to attach the snippet
- client. E.g. `name='maps'` attaches the snippet client to
- `ad.maps`.
- package: string, the package name of the snippet apk to connect to.
+ Args:
+ file_type: string, type of this file, like "logcat" etc.
+ time_identifier: string or RuntimeTestInfo. If a `RuntimeTestInfo`
+ is passed in, the `signature` of the test case will be used. If
+ a string is passed in, the string itself will be used.
+ Otherwise the current timestamp will be used.
+ extension_name: string, the extension name of the file.
- Raises:
- SnippetError: Illegal load operations are attempted.
- """
- # Should not load snippet with an existing attribute.
- if hasattr(self, name):
- raise SnippetError(
- self,
- 'Attribute "%s" already exists, please use a different name.' %
- name)
- self.services.snippets.add_snippet_client(name, package)
+ Returns:
+ String, the filename generated.
+ """
+ time_str = time_identifier
+ if time_identifier is None:
+ time_str = mobly_logger.get_log_file_timestamp()
+ elif isinstance(time_identifier, runtime_test_info.RuntimeTestInfo):
+ time_str = time_identifier.signature
+ filename_tokens = [file_type]
+ if self.debug_tag != self.serial:
+ filename_tokens.append(self.debug_tag)
+ filename_tokens.extend([self.serial, self.model, time_str])
+ filename_str = ','.join(filename_tokens)
+ if extension_name is not None:
+ filename_str = '%s.%s' % (filename_str, extension_name)
+ filename_str = mobly_logger.sanitize_filename(filename_str)
+ self.log.debug('Generated filename: %s', filename_str)
+ return filename_str
- def unload_snippet(self, name):
- """Stops a snippet apk.
+ def take_bug_report(self,
+ test_name=None,
+ begin_time=None,
+ timeout=300,
+ destination=None):
+ """Takes a bug report on the device and stores it in a file.
- Args:
- name: The attribute name the snippet server is attached with.
+ Args:
+ test_name: Name of the test method that triggered this bug report.
+ begin_time: Timestamp of when the test started. If not set, then
+ this will default to the current time.
+ timeout: float, the number of seconds to wait for bugreport to
+ complete, default is 5min.
+ destination: string, path to the directory where the bugreport
+ should be saved.
- Raises:
- SnippetError: The given snippet name is not registered.
- """
- self.services.snippets.remove_snippet_client(name)
+ Returns:
+ A string that is the absolute path to the bug report on the host.
+ """
+ prefix = DEFAULT_BUG_REPORT_NAME
+ if test_name:
+ prefix = '%s,%s' % (DEFAULT_BUG_REPORT_NAME, test_name)
+ if begin_time is None:
+ begin_time = mobly_logger.get_log_file_timestamp()
- def generate_filename(self,
- file_type,
- time_identifier=None,
- extension_name=None):
- """Generates a name for an output file related to this device.
+ new_br = True
+ try:
+ stdout = self.adb.shell('bugreportz -v').decode('utf-8')
+ # This check is necessary for builds before N, where adb shell's ret
+ # code and stderr are not propagated properly.
+ if 'not found' in stdout:
+ new_br = False
+ except adb.AdbError:
+ new_br = False
- The name follows the pattern:
+ if destination is None:
+ destination = os.path.join(self.log_path, 'BugReports')
+ br_path = utils.abs_path(destination)
+ utils.create_dir(br_path)
+ filename = self.generate_filename(prefix, str(begin_time), 'txt')
+ if new_br:
+ filename = filename.replace('.txt', '.zip')
+ full_out_path = os.path.join(br_path, filename)
+ # in case device restarted, wait for adb interface to return
+ self.wait_for_boot_completion()
+ self.log.debug('Start taking bugreport.')
+ if new_br:
+ out = self.adb.shell('bugreportz', timeout=timeout).decode('utf-8')
+ if not out.startswith('OK'):
+ raise DeviceError(self, 'Failed to take bugreport: %s' % out)
+ br_out_path = out.split(':')[1].strip()
+ self.adb.pull([br_out_path, full_out_path])
+ self.adb.shell(['rm', br_out_path])
+ else:
+ # shell=True as this command redirects the stdout to a local file
+ # using shell redirection.
+ self.adb.bugreport(' > "%s"' % full_out_path,
+ shell=True,
+ timeout=timeout)
+ self.log.debug('Bugreport taken at %s.', full_out_path)
+ return full_out_path
- {file type},{debug_tag},{serial},{model},{time identifier}.{ext}
+ def take_screenshot(self, destination):
+ """Takes a screenshot of the device.
- "debug_tag" is only added if it's different from the serial. "ext" is
- added if specified by user.
+ Args:
+ destination: string, full path to the directory to save in.
- Args:
- file_type: string, type of this file, like "logcat" etc.
- time_identifier: string or RuntimeTestInfo. If a `RuntimeTestInfo`
- is passed in, the `signature` of the test case will be used. If
- a string is passed in, the string itself will be used.
- Otherwise the current timestamp will be used.
- extension_name: string, the extension name of the file.
+ Returns:
+ string, full path to the screenshot file on the host.
+ """
+ filename = self.generate_filename('screenshot', extension_name='png')
+ device_path = os.path.join('/storage/emulated/0/', filename)
+ self.adb.shell(['screencap', '-p', device_path],
+ timeout=TAKE_SCREENSHOT_TIMEOUT_SECOND)
+ utils.create_dir(destination)
+ self.adb.pull([device_path, destination])
+ pic_path = os.path.join(destination, filename)
+ self.log.debug('Screenshot taken, saved on the host: %s', pic_path)
+ self.adb.shell(['rm', device_path])
+ return pic_path
- Returns:
- String, the filename generated.
- """
- time_str = time_identifier
- if time_identifier is None:
- time_str = mobly_logger.get_log_file_timestamp()
- elif isinstance(time_identifier, runtime_test_info.RuntimeTestInfo):
- time_str = time_identifier.signature
- filename_tokens = [file_type]
- if self.debug_tag != self.serial:
- filename_tokens.append(self.debug_tag)
- filename_tokens.extend([self.serial, self.model, time_str])
- filename_str = ','.join(filename_tokens)
- if extension_name is not None:
- filename_str = '%s.%s' % (filename_str, extension_name)
- filename_str = mobly_logger.sanitize_filename(filename_str)
- self.log.debug('Generated filename: %s', filename_str)
- return filename_str
+ def run_iperf_client(self, server_host, extra_args=''):
+ """Start iperf client on the device.
- def take_bug_report(self,
- test_name=None,
- begin_time=None,
- timeout=300,
- destination=None):
- """Takes a bug report on the device and stores it in a file.
+ Return status as true if iperf client start successfully.
+ And data flow information as results.
- Args:
- test_name: Name of the test method that triggered this bug report.
- begin_time: Timestamp of when the test started. If not set, then
- this will default to the current time.
- timeout: float, the number of seconds to wait for bugreport to
- complete, default is 5min.
- destination: string, path to the directory where the bugreport
- should be saved.
+ Args:
+ server_host: Address of the iperf server.
+ extra_args: A string representing extra arguments for iperf client,
+ e.g. '-i 1 -t 30'.
- Returns:
- A string that is the absolute path to the bug report on the host.
- """
- prefix = DEFAULT_BUG_REPORT_NAME
- if test_name:
- prefix = '%s,%s' % (DEFAULT_BUG_REPORT_NAME, test_name)
- if begin_time is None:
- begin_time = mobly_logger.get_log_file_timestamp()
+ Returns:
+ status: true if iperf client start successfully.
+ results: results have data flow information
+ """
+ out = self.adb.shell('iperf3 -c %s %s' % (server_host, extra_args))
+ clean_out = str(out, 'utf-8').strip().split('\n')
+ if 'error' in clean_out[0].lower():
+ return False, clean_out
+ return True, clean_out
- new_br = True
- try:
- stdout = self.adb.shell('bugreportz -v').decode('utf-8')
- # This check is necessary for builds before N, where adb shell's ret
- # code and stderr are not propagated properly.
- if 'not found' in stdout:
- new_br = False
- except adb.AdbError:
- new_br = False
+ def wait_for_boot_completion(
+ self, timeout=DEFAULT_TIMEOUT_BOOT_COMPLETION_SECOND):
+ """Waits for Android framework to broadcast ACTION_BOOT_COMPLETED.
- if destination is None:
- destination = os.path.join(self.log_path, 'BugReports')
- br_path = utils.abs_path(destination)
- utils.create_dir(br_path)
- filename = self.generate_filename(prefix, str(begin_time), 'txt')
- if new_br:
- filename = filename.replace('.txt', '.zip')
- full_out_path = os.path.join(br_path, filename)
- # in case device restarted, wait for adb interface to return
- self.wait_for_boot_completion()
- self.log.debug('Start taking bugreport.')
- if new_br:
- out = self.adb.shell('bugreportz', timeout=timeout).decode('utf-8')
- if not out.startswith('OK'):
- raise DeviceError(self, 'Failed to take bugreport: %s' % out)
- br_out_path = out.split(':')[1].strip()
- self.adb.pull([br_out_path, full_out_path])
- self.adb.shell(['rm', br_out_path])
- else:
- # shell=True as this command redirects the stdout to a local file
- # using shell redirection.
- self.adb.bugreport(' > "%s"' % full_out_path,
- shell=True,
- timeout=timeout)
- self.log.debug('Bugreport taken at %s.', full_out_path)
- return full_out_path
+ This function times out after 15 minutes.
- def take_screenshot(self, destination):
- """Takes a screenshot of the device.
+ Args:
+ timeout: float, the number of seconds to wait before timing out.
+ If not specified, no timeout takes effect.
+ """
+ timeout_start = time.time()
- Args:
- destination: string, full path to the directory to save in.
+ self.adb.wait_for_device(timeout=timeout)
+ while time.time() < timeout_start + timeout:
+ try:
+ if self.is_boot_completed():
+ return
+ except (adb.AdbError, adb.AdbTimeoutError):
+ # adb shell calls may fail during certain period of booting
+ # process, which is normal. Ignoring these errors.
+ pass
+ time.sleep(5)
+ raise DeviceError(self, 'Booting process timed out')
- Returns:
- string, full path to the screenshot file on the host.
- """
- filename = self.generate_filename('screenshot', extension_name='png')
- device_path = os.path.join('/storage/emulated/0/', filename)
- self.adb.shell(['screencap', '-p', device_path],
- timeout=TAKE_SCREENSHOT_TIMEOUT_SECOND)
- utils.create_dir(destination)
- self.adb.pull([device_path, destination])
- pic_path = os.path.join(destination, filename)
- self.log.debug('Screenshot taken, saved on the host: %s', pic_path)
- self.adb.shell(['rm', device_path])
- return pic_path
+ def is_boot_completed(self):
+ """Checks if device boot is completed by verifying system property."""
+ completed = self.adb.getprop('sys.boot_completed')
+ if completed == '1':
+ self.log.debug('Device boot completed.')
+ return True
+ return False
- def run_iperf_client(self, server_host, extra_args=''):
- """Start iperf client on the device.
+ def is_adb_detectable(self):
+ """Checks if USB is on and device is ready by verifying adb devices."""
+ serials = list_adb_devices()
+ if self.serial in serials:
+ self.log.debug('Is now adb detectable.')
+ return True
+ return False
- Return status as true if iperf client start successfully.
- And data flow information as results.
+ def reboot(self):
+ """Reboots the device.
- Args:
- server_host: Address of the iperf server.
- extra_args: A string representing extra arguments for iperf client,
- e.g. '-i 1 -t 30'.
+ Generally one should use this method to reboot the device instead of
+ directly calling `adb.reboot`. Because this method gracefully handles
+ the teardown and restoration of running services.
- Returns:
- status: true if iperf client start successfully.
- results: results have data flow information
- """
- out = self.adb.shell('iperf3 -c %s %s' % (server_host, extra_args))
- clean_out = str(out, 'utf-8').strip().split('\n')
- if 'error' in clean_out[0].lower():
- return False, clean_out
- return True, clean_out
+ This method is blocking and only returns when the reboot has completed
+ and the services restored.
- def wait_for_boot_completion(
- self, timeout=DEFAULT_TIMEOUT_BOOT_COMPLETION_SECOND):
- """Waits for Android framework to broadcast ACTION_BOOT_COMPLETED.
+ Raises:
+ Error: Waiting for completion timed out.
+ """
+ if self.is_bootloader:
+ self.fastboot.reboot()
+ return
+ with self.handle_reboot():
+ self.adb.reboot()
- This function times out after 15 minutes.
+ def __getattr__(self, name):
+ """Tries to return a snippet client registered with `name`.
- Args:
- timeout: float, the number of seconds to wait before timing out.
- If not specified, no timeout takes effect.
- """
- timeout_start = time.time()
-
- self.adb.wait_for_device(timeout=timeout)
- while time.time() < timeout_start + timeout:
- try:
- if self.is_boot_completed():
- return
- except (adb.AdbError, adb.AdbTimeoutError):
- # adb shell calls may fail during certain period of booting
- # process, which is normal. Ignoring these errors.
- pass
- time.sleep(5)
- raise DeviceError(self, 'Booting process timed out')
-
- def is_boot_completed(self):
- """Checks if device boot is completed by verifying system property."""
- completed = self.adb.getprop('sys.boot_completed')
- if completed == '1':
- self.log.debug('Device boot completed.')
- return True
- return False
-
- def is_adb_detectable(self):
- """Checks if USB is on and device is ready by verifying adb devices."""
- serials = list_adb_devices()
- if self.serial in serials:
- self.log.debug('Is now adb detectable.')
- return True
- return False
-
- def reboot(self):
- """Reboots the device.
-
- Generally one should use this method to reboot the device instead of
- directly calling `adb.reboot`. Because this method gracefully handles
- the teardown and restoration of running services.
-
- This method is blocking and only returns when the reboot has completed
- and the services restored.
-
- Raises:
- Error: Waiting for completion timed out.
- """
- if self.is_bootloader:
- self.fastboot.reboot()
- return
- with self.handle_reboot():
- self.adb.reboot()
-
- def __getattr__(self, name):
- """Tries to return a snippet client registered with `name`.
-
- This is for backward compatibility of direct accessing snippet clients.
- """
- client = self.services.snippets.get_snippet_client(name)
- if client:
- return client
- return self.__getattribute__(name)
+ This is for backward compatibility of direct accessing snippet clients.
+ """
+ client = self.services.snippets.get_snippet_client(name)
+ if client:
+ return client
+ return self.__getattribute__(name)
# Properties in AndroidDevice that have setters.
@@ -1124,20 +1124,20 @@
class AndroidDeviceLoggerAdapter(logging.LoggerAdapter):
- """A wrapper class that adds a prefix to each log line.
+ """A wrapper class that adds a prefix to each log line.
- Usage:
+ Usage:
- .. code-block:: python
+ .. code-block:: python
- my_log = AndroidDeviceLoggerAdapter(logging.getLogger(), {
- 'tag': <custom tag>
- })
+ my_log = AndroidDeviceLoggerAdapter(logging.getLogger(), {
+ 'tag': <custom tag>
+ })
- Then each log line added by my_log will have a prefix
- '[AndroidDevice|<tag>]'
- """
+ Then each log line added by my_log will have a prefix
+ '[AndroidDevice|<tag>]'
+ """
- def process(self, msg, kwargs):
- msg = _DEBUG_PREFIX_TEMPLATE % (self.extra['tag'], msg)
- return (msg, kwargs)
+ def process(self, msg, kwargs):
+ msg = _DEBUG_PREFIX_TEMPLATE % (self.extra['tag'], msg)
+ return (msg, kwargs)
diff --git a/mobly/controllers/android_device_lib/adb.py b/mobly/controllers/android_device_lib/adb.py
index fbcfd7d..43a4daa 100644
--- a/mobly/controllers/android_device_lib/adb.py
+++ b/mobly/controllers/android_device_lib/adb.py
@@ -43,477 +43,477 @@
class Error(Exception):
- """Base error type for adb proxy module."""
+ """Base error type for adb proxy module."""
class AdbError(Error):
- """Raised when an adb command encounters an error.
+ """Raised when an adb command encounters an error.
- Args:
- cmd: list of strings, the adb command executed.
- stdout: byte string, the raw stdout of the command.
- stderr: byte string, the raw stderr of the command.
- ret_code: int, the return code of the command.
- serial: string, the serial of the device the command is executed on.
- This is an empty string if the adb command is not specific to a
- device.
- """
+ Args:
+ cmd: list of strings, the adb command executed.
+ stdout: byte string, the raw stdout of the command.
+ stderr: byte string, the raw stderr of the command.
+ ret_code: int, the return code of the command.
+ serial: string, the serial of the device the command is executed on.
+ This is an empty string if the adb command is not specific to a
+ device.
+ """
- def __init__(self, cmd, stdout, stderr, ret_code, serial=''):
- self.cmd = cmd
- self.stdout = stdout
- self.stderr = stderr
- self.ret_code = ret_code
- self.serial = serial
+ def __init__(self, cmd, stdout, stderr, ret_code, serial=''):
+ self.cmd = cmd
+ self.stdout = stdout
+ self.stderr = stderr
+ self.ret_code = ret_code
+ self.serial = serial
- def __str__(self):
- return ('Error executing adb cmd "%s". ret: %d, stdout: %s, stderr: %s'
- ) % (utils.cli_cmd_to_string(
- self.cmd), self.ret_code, self.stdout, self.stderr)
+ def __str__(self):
+ return ('Error executing adb cmd "%s". ret: %d, stdout: %s, stderr: %s'
+ ) % (utils.cli_cmd_to_string(
+ self.cmd), self.ret_code, self.stdout, self.stderr)
class AdbTimeoutError(Error):
- """Raised when an command did not complete within expected time.
+ """Raised when an command did not complete within expected time.
- Args:
- cmd: list of strings, the adb command that timed out
- timeout: float, the number of seconds passed before timing out.
- serial: string, the serial of the device the command is executed on.
- This is an empty string if the adb command is not specific to a
- device.
- """
+ Args:
+ cmd: list of strings, the adb command that timed out
+ timeout: float, the number of seconds passed before timing out.
+ serial: string, the serial of the device the command is executed on.
+ This is an empty string if the adb command is not specific to a
+ device.
+ """
- def __init__(self, cmd, timeout, serial=''):
- self.cmd = cmd
- self.timeout = timeout
- self.serial = serial
+ def __init__(self, cmd, timeout, serial=''):
+ self.cmd = cmd
+ self.timeout = timeout
+ self.serial = serial
- def __str__(self):
- return 'Timed out executing command "%s" after %ss.' % (
- utils.cli_cmd_to_string(self.cmd), self.timeout)
+ def __str__(self):
+ return 'Timed out executing command "%s" after %ss.' % (
+ utils.cli_cmd_to_string(self.cmd), self.timeout)
def list_occupied_adb_ports():
- """Lists all the host ports occupied by adb forward.
+ """Lists all the host ports occupied by adb forward.
- This is useful because adb will silently override the binding if an attempt
- to bind to a port already used by adb was made, instead of throwing binding
- error. So one should always check what ports adb is using before trying to
- bind to a port with adb.
+ This is useful because adb will silently override the binding if an attempt
+ to bind to a port already used by adb was made, instead of throwing binding
+ error. So one should always check what ports adb is using before trying to
+ bind to a port with adb.
- Returns:
- A list of integers representing occupied host ports.
- """
- out = AdbProxy().forward('--list')
- clean_lines = str(out, 'utf-8').strip().split('\n')
- used_ports = []
- for line in clean_lines:
- tokens = line.split(' tcp:')
- if len(tokens) != 3:
- continue
- used_ports.append(int(tokens[1]))
- return used_ports
+ Returns:
+ A list of integers representing occupied host ports.
+ """
+ out = AdbProxy().forward('--list')
+ clean_lines = str(out, 'utf-8').strip().split('\n')
+ used_ports = []
+ for line in clean_lines:
+ tokens = line.split(' tcp:')
+ if len(tokens) != 3:
+ continue
+ used_ports.append(int(tokens[1]))
+ return used_ports
class AdbProxy(object):
- """Proxy class for ADB.
+ """Proxy class for ADB.
- For syntactic reasons, the '-' in adb commands need to be replaced with
- '_'. Can directly execute adb commands on an object:
- >> adb = AdbProxy(<serial>)
- >> adb.start_server()
- >> adb.devices() # will return the console output of "adb devices".
+ For syntactic reasons, the '-' in adb commands need to be replaced with
+ '_'. Can directly execute adb commands on an object:
+ >> adb = AdbProxy(<serial>)
+ >> adb.start_server()
+ >> adb.devices() # will return the console output of "adb devices".
- By default, command args are expected to be an iterable which is passed
- directly to subprocess.Popen():
- >> adb.shell(['echo', 'a', 'b'])
+ By default, command args are expected to be an iterable which is passed
+ directly to subprocess.Popen():
+ >> adb.shell(['echo', 'a', 'b'])
- This way of launching commands is recommended by the subprocess
- documentation to avoid shell injection vulnerabilities and avoid having to
- deal with multiple layers of shell quoting and different shell environments
- between different OSes.
+ This way of launching commands is recommended by the subprocess
+ documentation to avoid shell injection vulnerabilities and avoid having to
+ deal with multiple layers of shell quoting and different shell environments
+ between different OSes.
- If you really want to run the command through the system shell, this is
- possible by supplying shell=True, but try to avoid this if possible:
- >> adb.shell('cat /foo > /tmp/file', shell=True)
+ If you really want to run the command through the system shell, this is
+ possible by supplying shell=True, but try to avoid this if possible:
+ >> adb.shell('cat /foo > /tmp/file', shell=True)
+ """
+
+ def __init__(self, serial=''):
+ self.serial = serial
+
+ def _exec_cmd(self, args, shell, timeout, stderr):
+ """Executes adb commands.
+
+ Args:
+ args: string or list of strings, program arguments.
+ See subprocess.Popen() documentation.
+ shell: bool, True to run this command through the system shell,
+ False to invoke it directly. See subprocess.Popen() docs.
+ timeout: float, the number of seconds to wait before timing out.
+ If not specified, no timeout takes effect.
+ stderr: a Byte stream, like io.BytesIO, stderr of the command will
+ be written to this object if provided.
+
+ Returns:
+ The output of the adb command run if exit code is 0.
+
+ Raises:
+ ValueError: timeout value is invalid.
+ AdbError: The adb command exit code is not 0.
+ AdbTimeoutError: The adb command timed out.
"""
+ if timeout and timeout <= 0:
+ raise ValueError('Timeout is not a positive value: %s' % timeout)
+ try:
+ (ret, out, err) = utils.run_command(args,
+ shell=shell,
+ timeout=timeout)
+ except psutil.TimeoutExpired:
+ raise AdbTimeoutError(cmd=args,
+ timeout=timeout,
+ serial=self.serial)
- def __init__(self, serial=''):
- self.serial = serial
+ if stderr:
+ stderr.write(err)
+ logging.debug('cmd: %s, stdout: %s, stderr: %s, ret: %s',
+ utils.cli_cmd_to_string(args), out, err, ret)
+ if ret == 0:
+ return out
+ else:
+ raise AdbError(cmd=args,
+ stdout=out,
+ stderr=err,
+ ret_code=ret,
+ serial=self.serial)
- def _exec_cmd(self, args, shell, timeout, stderr):
- """Executes adb commands.
+ def _execute_and_process_stdout(self, args, shell, handler):
+ """Executes adb commands and processes the stdout with a handler.
- Args:
- args: string or list of strings, program arguments.
- See subprocess.Popen() documentation.
- shell: bool, True to run this command through the system shell,
- False to invoke it directly. See subprocess.Popen() docs.
- timeout: float, the number of seconds to wait before timing out.
- If not specified, no timeout takes effect.
- stderr: a Byte stream, like io.BytesIO, stderr of the command will
- be written to this object if provided.
+ Args:
+ args: string or list of strings, program arguments.
+ See subprocess.Popen() documentation.
+ shell: bool, True to run this command through the system shell,
+ False to invoke it directly. See subprocess.Popen() docs.
+ handler: func, a function to handle adb stdout line by line.
- Returns:
- The output of the adb command run if exit code is 0.
+ Returns:
+ The stderr of the adb command run if exit code is 0.
- Raises:
- ValueError: timeout value is invalid.
- AdbError: The adb command exit code is not 0.
- AdbTimeoutError: The adb command timed out.
- """
- if timeout and timeout <= 0:
- raise ValueError('Timeout is not a positive value: %s' % timeout)
- try:
- (ret, out, err) = utils.run_command(args,
- shell=shell,
- timeout=timeout)
- except psutil.TimeoutExpired:
- raise AdbTimeoutError(cmd=args,
- timeout=timeout,
- serial=self.serial)
-
- if stderr:
- stderr.write(err)
- logging.debug('cmd: %s, stdout: %s, stderr: %s, ret: %s',
- utils.cli_cmd_to_string(args), out, err, ret)
- if ret == 0:
- return out
+ Raises:
+ AdbError: The adb command exit code is not 0.
+ """
+ proc = subprocess.Popen(args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=shell,
+ bufsize=1)
+ out = '[elided, processed via handler]'
+ try:
+ # Even if the process dies, stdout.readline still works
+ # and will continue until it runs out of stdout to process.
+ while True:
+ line = proc.stdout.readline()
+ if line:
+ handler(line)
else:
- raise AdbError(cmd=args,
- stdout=out,
- stderr=err,
- ret_code=ret,
- serial=self.serial)
+ break
+ finally:
+ # Note, communicate will not contain any buffered output.
+ (unexpected_out, err) = proc.communicate()
+ if unexpected_out:
+ out = '[unexpected stdout] %s' % unexpected_out
+ for line in unexpected_out.splitlines():
+ handler(line)
- def _execute_and_process_stdout(self, args, shell, handler):
- """Executes adb commands and processes the stdout with a handler.
+ ret = proc.returncode
+ logging.debug('cmd: %s, stdout: %s, stderr: %s, ret: %s',
+ utils.cli_cmd_to_string(args), out, err, ret)
+ if ret == 0:
+ return err
+ else:
+ raise AdbError(cmd=args, stdout=out, stderr=err, ret_code=ret)
- Args:
- args: string or list of strings, program arguments.
- See subprocess.Popen() documentation.
- shell: bool, True to run this command through the system shell,
- False to invoke it directly. See subprocess.Popen() docs.
- handler: func, a function to handle adb stdout line by line.
+ def _construct_adb_cmd(self, raw_name, args, shell):
+ """Constructs an adb command with arguments for a subprocess call.
- Returns:
- The stderr of the adb command run if exit code is 0.
+ Args:
+ raw_name: string, the raw unsanitized name of the adb command to
+ format.
+ args: string or list of strings, arguments to the adb command.
+ See subprocess.Proc() documentation.
+ shell: bool, True to run this command through the system shell,
+ False to invoke it directly. See subprocess.Proc() docs.
- Raises:
- AdbError: The adb command exit code is not 0.
- """
- proc = subprocess.Popen(args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=shell,
- bufsize=1)
- out = '[elided, processed via handler]'
- try:
- # Even if the process dies, stdout.readline still works
- # and will continue until it runs out of stdout to process.
- while True:
- line = proc.stdout.readline()
- if line:
- handler(line)
- else:
- break
- finally:
- # Note, communicate will not contain any buffered output.
- (unexpected_out, err) = proc.communicate()
- if unexpected_out:
- out = '[unexpected stdout] %s' % unexpected_out
- for line in unexpected_out.splitlines():
- handler(line)
-
- ret = proc.returncode
- logging.debug('cmd: %s, stdout: %s, stderr: %s, ret: %s',
- utils.cli_cmd_to_string(args), out, err, ret)
- if ret == 0:
- return err
+ Returns:
+ The adb command in a format appropriate for subprocess. If shell is
+ True, then this is a string; otherwise, this is a list of
+ strings.
+ """
+ args = args or ''
+ name = raw_name.replace('_', '-')
+ if shell:
+ args = utils.cli_cmd_to_string(args)
+ # Add quotes around "adb" in case the ADB path contains spaces. This
+ # is pretty common on Windows (e.g. Program Files).
+ if self.serial:
+ adb_cmd = '"%s" -s "%s" %s %s' % (ADB, self.serial, name, args)
+ else:
+ adb_cmd = '"%s" %s %s' % (ADB, name, args)
+ else:
+ adb_cmd = [ADB]
+ if self.serial:
+ adb_cmd.extend(['-s', self.serial])
+ adb_cmd.append(name)
+ if args:
+ if isinstance(args, str):
+ adb_cmd.append(args)
else:
- raise AdbError(cmd=args, stdout=out, stderr=err, ret_code=ret)
+ adb_cmd.extend(args)
+ return adb_cmd
- def _construct_adb_cmd(self, raw_name, args, shell):
- """Constructs an adb command with arguments for a subprocess call.
+ def _exec_adb_cmd(self, name, args, shell, timeout, stderr):
+ adb_cmd = self._construct_adb_cmd(name, args, shell=shell)
+ out = self._exec_cmd(adb_cmd,
+ shell=shell,
+ timeout=timeout,
+ stderr=stderr)
+ return out
- Args:
- raw_name: string, the raw unsanitized name of the adb command to
- format.
- args: string or list of strings, arguments to the adb command.
- See subprocess.Proc() documentation.
- shell: bool, True to run this command through the system shell,
- False to invoke it directly. See subprocess.Proc() docs.
+ def _execute_adb_and_process_stdout(self, name, args, shell, handler):
+ adb_cmd = self._construct_adb_cmd(name, args, shell=shell)
+ err = self._execute_and_process_stdout(adb_cmd,
+ shell=shell,
+ handler=handler)
+ return err
- Returns:
- The adb command in a format appropriate for subprocess. If shell is
- True, then this is a string; otherwise, this is a list of
- strings.
- """
- args = args or ''
- name = raw_name.replace('_', '-')
- if shell:
- args = utils.cli_cmd_to_string(args)
- # Add quotes around "adb" in case the ADB path contains spaces. This
- # is pretty common on Windows (e.g. Program Files).
- if self.serial:
- adb_cmd = '"%s" -s "%s" %s %s' % (ADB, self.serial, name, args)
- else:
- adb_cmd = '"%s" %s %s' % (ADB, name, args)
+ def _parse_getprop_output(self, output):
+ """Parses the raw output of `adb shell getprop` into a dictionary.
+
+ Args:
+ output: byte str, the raw output of the `adb shell getprop` call.
+
+ Returns:
+ dict, name-value pairs of the properties.
+ """
+ output = output.decode('utf-8', errors='ignore').replace('\r\n', '\n')
+ results = {}
+ for line in output.split(']\n'):
+ if not line:
+ continue
+ try:
+ name, value = line.split(': ', 1)
+ except ValueError:
+ logging.debug('Failed to parse adb getprop line %s', line)
+ continue
+ name = name.strip()[1:-1]
+ # Remove any square bracket from either end of the value string.
+ if value and value[0] == '[':
+ value = value[1:]
+ results[name] = value
+ return results
+
+ @property
+ def current_user_id(self):
+ """The integer ID of the current Android user.
+
+ Some adb commands require specifying a user ID to work properly. Use
+ this to get the current user ID.
+
+ Note a "user" is not the same as an "account" in Android. See AOSP's
+ documentation for details.
+ https://source.android.com/devices/tech/admin/multi-user
+ """
+ sdk_int = int(self.getprop('ro.build.version.sdk'))
+ if sdk_int >= 24:
+ return int(self.shell(['am', 'get-current-user']))
+ if sdk_int >= 21:
+ user_info_str = self.shell(['dumpsys', 'user']).decode('utf-8')
+ return int(re.findall(r'\{(\d+):', user_info_str)[0])
+ # Multi-user is not supported in SDK < 21, only user 0 exists.
+ return 0
+
+ def getprop(self, prop_name):
+ """Get a property of the device.
+
+ This is a convenience wrapper for `adb shell getprop xxx`.
+
+ Args:
+ prop_name: A string that is the name of the property to get.
+
+ Returns:
+ A string that is the value of the property, or None if the property
+ doesn't exist.
+ """
+ return self.shell(
+ ['getprop', prop_name],
+ timeout=DEFAULT_GETPROP_TIMEOUT_SEC).decode('utf-8').strip()
+
+ def getprops(self, prop_names):
+ """Get multiple properties of the device.
+
+ This is a convenience wrapper for `adb shell getprop`. Use this to
+ reduce the number of adb calls when getting multiple properties.
+
+ Args:
+ prop_names: list of strings, the names of the properties to get.
+
+ Returns:
+ A dict containing name-value pairs of the properties requested, if
+ they exist.
+ """
+ attempts = DEFAULT_GETPROPS_ATTEMPTS
+ results = {}
+ for attempt in range(attempts):
+ # The ADB getprop command can randomly return empty string, so try
+ # multiple times. This value should always be non-empty if the device
+ # in a working state.
+ raw_output = self.shell(['getprop'],
+ timeout=DEFAULT_GETPROP_TIMEOUT_SEC)
+ properties = self._parse_getprop_output(raw_output)
+ if properties:
+ for name in prop_names:
+ if name in properties:
+ results[name] = properties[name]
+ break
+ # Don't call sleep on the last attempt.
+ if attempt < attempts - 1:
+ time.sleep(DEFAULT_GETPROPS_RETRY_SLEEP_SEC)
+ return results
+
+ def has_shell_command(self, command):
+ """Checks to see if a given check command exists on the device.
+
+ Args:
+ command: A string that is the name of the command to check.
+
+ Returns:
+ A boolean that is True if the command exists and False otherwise.
+ """
+ try:
+ output = self.shell(['command', '-v',
+ command]).decode('utf-8').strip()
+ return command in output
+ except AdbError:
+ # If the command doesn't exist, then 'command -v' can return
+ # an exit code > 1.
+ return False
+
+ def forward(self, args=None, shell=False):
+ with ADB_PORT_LOCK:
+ return self._exec_adb_cmd('forward',
+ args,
+ shell,
+ timeout=None,
+ stderr=None)
+
+ def instrument(self, package, options=None, runner=None, handler=None):
+ """Runs an instrumentation command on the device.
+
+ This is a convenience wrapper to avoid parameter formatting.
+
+ Example:
+
+ .. code-block:: python
+
+ device.instrument(
+ 'com.my.package.test',
+ options = {
+ 'class': 'com.my.package.test.TestSuite',
+ },
+ )
+
+ Args:
+ package: string, the package of the instrumentation tests.
+ options: dict, the instrumentation options including the test
+ class.
+ runner: string, the test runner name, which defaults to
+ DEFAULT_INSTRUMENTATION_RUNNER.
+ handler: optional func, when specified the function is used to parse
+ the instrumentation stdout line by line as the output is
+ generated; otherwise, the stdout is simply returned once the
+ instrumentation is finished.
+
+ Returns:
+ The stdout of instrumentation command or the stderr if the handler
+ is set.
+ """
+ if runner is None:
+ runner = DEFAULT_INSTRUMENTATION_RUNNER
+ if options is None:
+ options = {}
+
+ options_list = []
+ for option_key, option_value in options.items():
+ options_list.append('-e %s %s' % (option_key, option_value))
+ options_string = ' '.join(options_list)
+
+ instrumentation_command = 'am instrument -r -w %s %s/%s' % (
+ options_string, package, runner)
+ logging.info('AndroidDevice|%s: Executing adb shell %s', self.serial,
+ instrumentation_command)
+ if handler is None:
+ return self._exec_adb_cmd('shell',
+ instrumentation_command,
+ shell=False,
+ timeout=None,
+ stderr=None)
+ else:
+ return self._execute_adb_and_process_stdout(
+ 'shell', instrumentation_command, shell=False, handler=handler)
+
+ def root(self):
+ """Enables ADB root mode on the device.
+
+ This method will retry to execute the command `adb root` when an
+ AdbError occurs, since sometimes the error `adb: unable to connect
+ for root: closed` is raised when executing `adb root` immediately after
+ the device is booted to OS.
+
+ Returns:
+ A string that is the stdout of root command.
+
+ Raises:
+ AdbError: If the command exit code is not 0.
+ """
+ for attempt in range(ADB_ROOT_RETRY_ATTMEPTS):
+ try:
+ return self._exec_adb_cmd('root',
+ args=None,
+ shell=False,
+ timeout=None,
+ stderr=None)
+ except AdbError as e:
+ if attempt + 1 < ADB_ROOT_RETRY_ATTMEPTS:
+ logging.debug(
+ 'Retry the command "%s" since Error "%s" occurred.' %
+ (utils.cli_cmd_to_string(e.cmd),
+ e.stderr.decode('utf-8').strip()))
+ # Buffer between "adb root" commands.
+ time.sleep(ADB_ROOT_RETRY_ATTEMPT_INTERVAL_SEC)
else:
- adb_cmd = [ADB]
- if self.serial:
- adb_cmd.extend(['-s', self.serial])
- adb_cmd.append(name)
- if args:
- if isinstance(args, str):
- adb_cmd.append(args)
- else:
- adb_cmd.extend(args)
- return adb_cmd
+ raise e
- def _exec_adb_cmd(self, name, args, shell, timeout, stderr):
- adb_cmd = self._construct_adb_cmd(name, args, shell=shell)
- out = self._exec_cmd(adb_cmd,
- shell=shell,
- timeout=timeout,
- stderr=stderr)
- return out
+ def __getattr__(self, name):
+ def adb_call(args=None, shell=False, timeout=None, stderr=None):
+ """Wrapper for an ADB command.
- def _execute_adb_and_process_stdout(self, name, args, shell, handler):
- adb_cmd = self._construct_adb_cmd(name, args, shell=shell)
- err = self._execute_and_process_stdout(adb_cmd,
- shell=shell,
- handler=handler)
- return err
+ Args:
+ args: string or list of strings, arguments to the adb command.
+ See subprocess.Proc() documentation.
+ shell: bool, True to run this command through the system shell,
+ False to invoke it directly. See subprocess.Proc() docs.
+ timeout: float, the number of seconds to wait before timing out.
+ If not specified, no timeout takes effect.
+ stderr: a Byte stream, like io.BytesIO, stderr of the command
+ will be written to this object if provided.
- def _parse_getprop_output(self, output):
- """Parses the raw output of `adb shell getprop` into a dictionary.
+ Returns:
+ The output of the adb command run if exit code is 0.
+ """
+ return self._exec_adb_cmd(name,
+ args,
+ shell=shell,
+ timeout=timeout,
+ stderr=stderr)
- Args:
- output: byte str, the raw output of the `adb shell getprop` call.
-
- Returns:
- dict, name-value pairs of the properties.
- """
- output = output.decode('utf-8', errors='ignore').replace('\r\n', '\n')
- results = {}
- for line in output.split(']\n'):
- if not line:
- continue
- try:
- name, value = line.split(': ', 1)
- except ValueError:
- logging.debug('Failed to parse adb getprop line %s', line)
- continue
- name = name.strip()[1:-1]
- # Remove any square bracket from either end of the value string.
- if value and value[0] == '[':
- value = value[1:]
- results[name] = value
- return results
-
- @property
- def current_user_id(self):
- """The integer ID of the current Android user.
-
- Some adb commands require specifying a user ID to work properly. Use
- this to get the current user ID.
-
- Note a "user" is not the same as an "account" in Android. See AOSP's
- documentation for details.
- https://source.android.com/devices/tech/admin/multi-user
- """
- sdk_int = int(self.getprop('ro.build.version.sdk'))
- if sdk_int >= 24:
- return int(self.shell(['am', 'get-current-user']))
- if sdk_int >= 21:
- user_info_str = self.shell(['dumpsys', 'user']).decode('utf-8')
- return int(re.findall(r'\{(\d+):', user_info_str)[0])
- # Multi-user is not supported in SDK < 21, only user 0 exists.
- return 0
-
- def getprop(self, prop_name):
- """Get a property of the device.
-
- This is a convenience wrapper for `adb shell getprop xxx`.
-
- Args:
- prop_name: A string that is the name of the property to get.
-
- Returns:
- A string that is the value of the property, or None if the property
- doesn't exist.
- """
- return self.shell(
- ['getprop', prop_name],
- timeout=DEFAULT_GETPROP_TIMEOUT_SEC).decode('utf-8').strip()
-
- def getprops(self, prop_names):
- """Get multiple properties of the device.
-
- This is a convenience wrapper for `adb shell getprop`. Use this to
- reduce the number of adb calls when getting multiple properties.
-
- Args:
- prop_names: list of strings, the names of the properties to get.
-
- Returns:
- A dict containing name-value pairs of the properties requested, if
- they exist.
- """
- attempts = DEFAULT_GETPROPS_ATTEMPTS
- results = {}
- for attempt in range(attempts):
- # The ADB getprop command can randomly return empty string, so try
- # multiple times. This value should always be non-empty if the device
- # in a working state.
- raw_output = self.shell(['getprop'],
- timeout=DEFAULT_GETPROP_TIMEOUT_SEC)
- properties = self._parse_getprop_output(raw_output)
- if properties:
- for name in prop_names:
- if name in properties:
- results[name] = properties[name]
- break
- # Don't call sleep on the last attempt.
- if attempt < attempts - 1:
- time.sleep(DEFAULT_GETPROPS_RETRY_SLEEP_SEC)
- return results
-
- def has_shell_command(self, command):
- """Checks to see if a given check command exists on the device.
-
- Args:
- command: A string that is the name of the command to check.
-
- Returns:
- A boolean that is True if the command exists and False otherwise.
- """
- try:
- output = self.shell(['command', '-v',
- command]).decode('utf-8').strip()
- return command in output
- except AdbError:
- # If the command doesn't exist, then 'command -v' can return
- # an exit code > 1.
- return False
-
- def forward(self, args=None, shell=False):
- with ADB_PORT_LOCK:
- return self._exec_adb_cmd('forward',
- args,
- shell,
- timeout=None,
- stderr=None)
-
- def instrument(self, package, options=None, runner=None, handler=None):
- """Runs an instrumentation command on the device.
-
- This is a convenience wrapper to avoid parameter formatting.
-
- Example:
-
- .. code-block:: python
-
- device.instrument(
- 'com.my.package.test',
- options = {
- 'class': 'com.my.package.test.TestSuite',
- },
- )
-
- Args:
- package: string, the package of the instrumentation tests.
- options: dict, the instrumentation options including the test
- class.
- runner: string, the test runner name, which defaults to
- DEFAULT_INSTRUMENTATION_RUNNER.
- handler: optional func, when specified the function is used to parse
- the instrumentation stdout line by line as the output is
- generated; otherwise, the stdout is simply returned once the
- instrumentation is finished.
-
- Returns:
- The stdout of instrumentation command or the stderr if the handler
- is set.
- """
- if runner is None:
- runner = DEFAULT_INSTRUMENTATION_RUNNER
- if options is None:
- options = {}
-
- options_list = []
- for option_key, option_value in options.items():
- options_list.append('-e %s %s' % (option_key, option_value))
- options_string = ' '.join(options_list)
-
- instrumentation_command = 'am instrument -r -w %s %s/%s' % (
- options_string, package, runner)
- logging.info('AndroidDevice|%s: Executing adb shell %s', self.serial,
- instrumentation_command)
- if handler is None:
- return self._exec_adb_cmd('shell',
- instrumentation_command,
- shell=False,
- timeout=None,
- stderr=None)
- else:
- return self._execute_adb_and_process_stdout(
- 'shell', instrumentation_command, shell=False, handler=handler)
-
- def root(self):
- """Enables ADB root mode on the device.
-
- This method will retry to execute the command `adb root` when an
- AdbError occurs, since sometimes the error `adb: unable to connect
- for root: closed` is raised when executing `adb root` immediately after
- the device is booted to OS.
-
- Returns:
- A string that is the stdout of root command.
-
- Raises:
- AdbError: If the command exit code is not 0.
- """
- for attempt in range(ADB_ROOT_RETRY_ATTMEPTS):
- try:
- return self._exec_adb_cmd('root',
- args=None,
- shell=False,
- timeout=None,
- stderr=None)
- except AdbError as e:
- if attempt + 1 < ADB_ROOT_RETRY_ATTMEPTS:
- logging.debug(
- 'Retry the command "%s" since Error "%s" occurred.' %
- (utils.cli_cmd_to_string(e.cmd),
- e.stderr.decode('utf-8').strip()))
- # Buffer between "adb root" commands.
- time.sleep(ADB_ROOT_RETRY_ATTEMPT_INTERVAL_SEC)
- else:
- raise e
-
- def __getattr__(self, name):
- def adb_call(args=None, shell=False, timeout=None, stderr=None):
- """Wrapper for an ADB command.
-
- Args:
- args: string or list of strings, arguments to the adb command.
- See subprocess.Proc() documentation.
- shell: bool, True to run this command through the system shell,
- False to invoke it directly. See subprocess.Proc() docs.
- timeout: float, the number of seconds to wait before timing out.
- If not specified, no timeout takes effect.
- stderr: a Byte stream, like io.BytesIO, stderr of the command
- will be written to this object if provided.
-
- Returns:
- The output of the adb command run if exit code is 0.
- """
- return self._exec_adb_cmd(name,
- args,
- shell=shell,
- timeout=timeout,
- stderr=stderr)
-
- return adb_call
+ return adb_call
diff --git a/mobly/controllers/android_device_lib/callback_handler.py b/mobly/controllers/android_device_lib/callback_handler.py
index 868c8da..8eb0869 100644
--- a/mobly/controllers/android_device_lib/callback_handler.py
+++ b/mobly/controllers/android_device_lib/callback_handler.py
@@ -25,146 +25,146 @@
class Error(errors.DeviceError):
- pass
+ pass
class TimeoutError(Error):
- pass
+ pass
class CallbackHandler(object):
- """The class used to handle a specific group of callback events.
+ """The class used to handle a specific group of callback events.
- All the events handled by a CallbackHandler are originally triggered by one
- async Rpc call. All the events are tagged with a callback_id specific to a
- call to an AsyncRpc method defined on the server side.
+ All the events handled by a CallbackHandler are originally triggered by one
+ async Rpc call. All the events are tagged with a callback_id specific to a
+ call to an AsyncRpc method defined on the server side.
- The raw message representing an event looks like:
+ The raw message representing an event looks like:
- .. code-block:: python
+ .. code-block:: python
- {
- 'callbackId': <string, callbackId>,
- 'name': <string, name of the event>,
- 'time': <long, epoch time of when the event was created on the
- server side>,
- 'data': <dict, extra data from the callback on the server side>
- }
+ {
+ 'callbackId': <string, callbackId>,
+ 'name': <string, name of the event>,
+ 'time': <long, epoch time of when the event was created on the
+ server side>,
+ 'data': <dict, extra data from the callback on the server side>
+ }
- Each message is then used to create a SnippetEvent object on the client
- side.
+ Each message is then used to create a SnippetEvent object on the client
+ side.
- Attributes:
- ret_value: The direct return value of the async Rpc call.
+ Attributes:
+ ret_value: The direct return value of the async Rpc call.
+ """
+
+ def __init__(self, callback_id, event_client, ret_value, method_name, ad):
+ self._id = callback_id
+ self._event_client = event_client
+ self.ret_value = ret_value
+ self._method_name = method_name
+ self._ad = ad
+
+ @property
+ def callback_id(self):
+ return self._id
+
+ def waitAndGet(self, event_name, timeout=DEFAULT_TIMEOUT):
+ """Blocks until an event of the specified name has been received and
+ return the event, or timeout.
+
+ Args:
+ event_name: string, name of the event to get.
+ timeout: float, the number of seconds to wait before giving up.
+
+ Returns:
+ SnippetEvent, the oldest entry of the specified event.
+
+ Raises:
+ Error: If the specified timeout is longer than the max timeout
+ supported.
+ TimeoutError: The expected event does not occur within time limit.
"""
-
- def __init__(self, callback_id, event_client, ret_value, method_name, ad):
- self._id = callback_id
- self._event_client = event_client
- self.ret_value = ret_value
- self._method_name = method_name
- self._ad = ad
-
- @property
- def callback_id(self):
- return self._id
-
- def waitAndGet(self, event_name, timeout=DEFAULT_TIMEOUT):
- """Blocks until an event of the specified name has been received and
- return the event, or timeout.
-
- Args:
- event_name: string, name of the event to get.
- timeout: float, the number of seconds to wait before giving up.
-
- Returns:
- SnippetEvent, the oldest entry of the specified event.
-
- Raises:
- Error: If the specified timeout is longer than the max timeout
- supported.
- TimeoutError: The expected event does not occur within time limit.
- """
- if timeout:
- if timeout > MAX_TIMEOUT:
- raise Error(
- self._ad,
- 'Specified timeout %s is longer than max timeout %s.' %
- (timeout, MAX_TIMEOUT))
- # Convert to milliseconds for java side.
- timeout_ms = int(timeout * 1000)
- try:
- raw_event = self._event_client.eventWaitAndGet(
- self._id, event_name, timeout_ms)
- except Exception as e:
- if 'EventSnippetException: timeout.' in str(e):
- raise TimeoutError(
- self._ad,
- 'Timed out after waiting %ss for event "%s" triggered by'
- ' %s (%s).' %
- (timeout, event_name, self._method_name, self._id))
- raise
- return snippet_event.from_dict(raw_event)
-
- def waitForEvent(self, event_name, predicate, timeout=DEFAULT_TIMEOUT):
- """Wait for an event of a specific name that satisfies the predicate.
-
- This call will block until the expected event has been received or time
- out.
-
- The predicate function defines the condition the event is expected to
- satisfy. It takes an event and returns True if the condition is
- satisfied, False otherwise.
-
- Note all events of the same name that are received but don't satisfy
- the predicate will be discarded and not be available for further
- consumption.
-
- Args:
- event_name: string, the name of the event to wait for.
- predicate: function, a function that takes an event (dictionary) and
- returns a bool.
- timeout: float, default is 120s.
-
- Returns:
- dictionary, the event that satisfies the predicate if received.
-
- Raises:
- TimeoutError: raised if no event that satisfies the predicate is
- received after timeout seconds.
- """
- deadline = time.time() + timeout
- while time.time() <= deadline:
- # Calculate the max timeout for the next event rpc call.
- rpc_timeout = deadline - time.time()
- if rpc_timeout < 0:
- break
- # A single RPC call cannot exceed MAX_TIMEOUT.
- rpc_timeout = min(rpc_timeout, MAX_TIMEOUT)
- try:
- event = self.waitAndGet(event_name, rpc_timeout)
- except TimeoutError:
- # Ignoring TimeoutError since we need to throw one with a more
- # specific message.
- break
- if predicate(event):
- return event
+ if timeout:
+ if timeout > MAX_TIMEOUT:
+ raise Error(
+ self._ad,
+ 'Specified timeout %s is longer than max timeout %s.' %
+ (timeout, MAX_TIMEOUT))
+ # Convert to milliseconds for java side.
+ timeout_ms = int(timeout * 1000)
+ try:
+ raw_event = self._event_client.eventWaitAndGet(
+ self._id, event_name, timeout_ms)
+ except Exception as e:
+ if 'EventSnippetException: timeout.' in str(e):
raise TimeoutError(
- self._ad,
- 'Timed out after %ss waiting for an "%s" event that satisfies the '
- 'predicate "%s".' % (timeout, event_name, predicate.__name__))
+ self._ad,
+ 'Timed out after waiting %ss for event "%s" triggered by'
+ ' %s (%s).' %
+ (timeout, event_name, self._method_name, self._id))
+ raise
+ return snippet_event.from_dict(raw_event)
- def getAll(self, event_name):
- """Gets all the events of a certain name that have been received so
- far. This is a non-blocking call.
+ def waitForEvent(self, event_name, predicate, timeout=DEFAULT_TIMEOUT):
+ """Wait for an event of a specific name that satisfies the predicate.
- Args:
- callback_id: The id of the callback.
- event_name: string, the name of the event to get.
+ This call will block until the expected event has been received or time
+ out.
- Returns:
- A list of SnippetEvent, each representing an event from the Java
- side.
- """
- raw_events = self._event_client.eventGetAll(self._id, event_name)
- return [snippet_event.from_dict(msg) for msg in raw_events]
+ The predicate function defines the condition the event is expected to
+ satisfy. It takes an event and returns True if the condition is
+ satisfied, False otherwise.
+
+ Note all events of the same name that are received but don't satisfy
+ the predicate will be discarded and not be available for further
+ consumption.
+
+ Args:
+ event_name: string, the name of the event to wait for.
+ predicate: function, a function that takes an event (dictionary) and
+ returns a bool.
+ timeout: float, default is 120s.
+
+ Returns:
+ dictionary, the event that satisfies the predicate if received.
+
+ Raises:
+ TimeoutError: raised if no event that satisfies the predicate is
+ received after timeout seconds.
+ """
+ deadline = time.time() + timeout
+ while time.time() <= deadline:
+ # Calculate the max timeout for the next event rpc call.
+ rpc_timeout = deadline - time.time()
+ if rpc_timeout < 0:
+ break
+ # A single RPC call cannot exceed MAX_TIMEOUT.
+ rpc_timeout = min(rpc_timeout, MAX_TIMEOUT)
+ try:
+ event = self.waitAndGet(event_name, rpc_timeout)
+ except TimeoutError:
+ # Ignoring TimeoutError since we need to throw one with a more
+ # specific message.
+ break
+ if predicate(event):
+ return event
+ raise TimeoutError(
+ self._ad,
+ 'Timed out after %ss waiting for an "%s" event that satisfies the '
+ 'predicate "%s".' % (timeout, event_name, predicate.__name__))
+
+ def getAll(self, event_name):
+ """Gets all the events of a certain name that have been received so
+ far. This is a non-blocking call.
+
+ Args:
+ callback_id: The id of the callback.
+ event_name: string, the name of the event to get.
+
+ Returns:
+ A list of SnippetEvent, each representing an event from the Java
+ side.
+ """
+ raw_events = self._event_client.eventGetAll(self._id, event_name)
+ return [snippet_event.from_dict(msg) for msg in raw_events]
diff --git a/mobly/controllers/android_device_lib/errors.py b/mobly/controllers/android_device_lib/errors.py
index 077841a..416054d 100644
--- a/mobly/controllers/android_device_lib/errors.py
+++ b/mobly/controllers/android_device_lib/errors.py
@@ -20,31 +20,31 @@
class Error(signals.ControllerError):
- pass
+ pass
class DeviceError(Error):
- """Raised for errors specific to an AndroidDevice object."""
+ """Raised for errors specific to an AndroidDevice object."""
- def __init__(self, ad, msg):
- template = '%s %s'
- # If the message starts with the hierarchy token, don't add the extra
- # space.
- if isinstance(msg, str) and msg.startswith(HIERARCHY_TOKEN):
- template = '%s%s'
- new_msg = template % (repr(ad), msg)
- super(DeviceError, self).__init__(new_msg)
+ def __init__(self, ad, msg):
+ template = '%s %s'
+ # If the message starts with the hierarchy token, don't add the extra
+ # space.
+ if isinstance(msg, str) and msg.startswith(HIERARCHY_TOKEN):
+ template = '%s%s'
+ new_msg = template % (repr(ad), msg)
+ super(DeviceError, self).__init__(new_msg)
class ServiceError(DeviceError):
- """Raised for errors specific to an AndroidDevice service.
+ """Raised for errors specific to an AndroidDevice service.
- A service is inherently associated with a device instance, so the service
- error type is a subtype of `DeviceError`.
- """
- SERVICE_TYPE = None
+ A service is inherently associated with a device instance, so the service
+ error type is a subtype of `DeviceError`.
+ """
+ SERVICE_TYPE = None
- def __init__(self, device, msg):
- new_msg = '%sService<%s> %s' % (HIERARCHY_TOKEN, self.SERVICE_TYPE,
- msg)
- super(ServiceError, self).__init__(device, new_msg)
+ def __init__(self, device, msg):
+ new_msg = '%sService<%s> %s' % (HIERARCHY_TOKEN, self.SERVICE_TYPE,
+ msg)
+ super(ServiceError, self).__init__(device, new_msg)
diff --git a/mobly/controllers/android_device_lib/event_dispatcher.py b/mobly/controllers/android_device_lib/event_dispatcher.py
index 1785716..8279e03 100644
--- a/mobly/controllers/android_device_lib/event_dispatcher.py
+++ b/mobly/controllers/android_device_lib/event_dispatcher.py
@@ -21,413 +21,413 @@
class EventDispatcherError(Exception):
- pass
+ pass
class IllegalStateError(EventDispatcherError):
- """Raise when user tries to put event_dispatcher into an illegal state.
- """
+ """Raise when user tries to put event_dispatcher into an illegal state.
+ """
class DuplicateError(EventDispatcherError):
- """Raise when a duplicate is being created and it shouldn't.
- """
+ """Raise when a duplicate is being created and it shouldn't.
+ """
class EventDispatcher:
- """Class managing events for an sl4a connection.
+ """Class managing events for an sl4a connection.
+ """
+
+ DEFAULT_TIMEOUT = 60
+
+ def __init__(self, sl4a):
+ self._sl4a = sl4a
+ self.started = False
+ self.executor = None
+ self.poller = None
+ self.event_dict = {}
+ self.handlers = {}
+ self.lock = threading.RLock()
+
+ def poll_events(self):
+ """Continuously polls all types of events from sl4a.
+
+ Events are sorted by name and store in separate queues.
+ If there are registered handlers, the handlers will be called with
+ corresponding event immediately upon event discovery, and the event
+ won't be stored. If exceptions occur, stop the dispatcher and return
"""
-
- DEFAULT_TIMEOUT = 60
-
- def __init__(self, sl4a):
- self._sl4a = sl4a
- self.started = False
- self.executor = None
- self.poller = None
- self.event_dict = {}
- self.handlers = {}
- self.lock = threading.RLock()
-
- def poll_events(self):
- """Continuously polls all types of events from sl4a.
-
- Events are sorted by name and store in separate queues.
- If there are registered handlers, the handlers will be called with
- corresponding event immediately upon event discovery, and the event
- won't be stored. If exceptions occur, stop the dispatcher and return
- """
- while self.started:
- event_obj = None
- event_name = None
- try:
- event_obj = self._sl4a.eventWait(50000)
- except:
- if self.started:
- print("Exception happened during polling.")
- print(traceback.format_exc())
- raise
- if not event_obj:
- continue
- elif 'name' not in event_obj:
- print("Received Malformed event {}".format(event_obj))
- continue
- else:
- event_name = event_obj['name']
- # if handler registered, process event
- if event_name in self.handlers:
- self.handle_subscribed_event(event_obj, event_name)
- if event_name == "EventDispatcherShutdown":
- self._sl4a.closeSl4aSession()
- break
- else:
- self.lock.acquire()
- if event_name in self.event_dict: # otherwise, cache event
- self.event_dict[event_name].put(event_obj)
- else:
- q = queue.Queue()
- q.put(event_obj)
- self.event_dict[event_name] = q
- self.lock.release()
-
- def register_handler(self, handler, event_name, args):
- """Registers an event handler.
-
- One type of event can only have one event handler associated with it.
-
- Args:
- handler: The event handler function to be registered.
- event_name: Name of the event the handler is for.
- args: User arguments to be passed to the handler when it's called.
-
- Raises:
- IllegalStateError: Raised if attempts to register a handler after
- the dispatcher starts running.
- DuplicateError: Raised if attempts to register more than one
- handler for one type of event.
- """
+ while self.started:
+ event_obj = None
+ event_name = None
+ try:
+ event_obj = self._sl4a.eventWait(50000)
+ except:
if self.started:
- raise IllegalStateError(("Can't register service after polling is"
- " started"))
+ print("Exception happened during polling.")
+ print(traceback.format_exc())
+ raise
+ if not event_obj:
+ continue
+ elif 'name' not in event_obj:
+ print("Received Malformed event {}".format(event_obj))
+ continue
+ else:
+ event_name = event_obj['name']
+ # if handler registered, process event
+ if event_name in self.handlers:
+ self.handle_subscribed_event(event_obj, event_name)
+ if event_name == "EventDispatcherShutdown":
+ self._sl4a.closeSl4aSession()
+ break
+ else:
self.lock.acquire()
- try:
- if event_name in self.handlers:
- raise DuplicateError(
- 'A handler for {} already exists'.format(event_name))
- self.handlers[event_name] = (handler, args)
- finally:
- self.lock.release()
-
- def start(self):
- """Starts the event dispatcher.
-
- Initiates executor and start polling events.
-
- Raises:
- IllegalStateError: Can't start a dispatcher again when it's already
- running.
- """
- if not self.started:
- self.started = True
- self.executor = ThreadPoolExecutor(max_workers=32)
- self.poller = self.executor.submit(self.poll_events)
+ if event_name in self.event_dict: # otherwise, cache event
+ self.event_dict[event_name].put(event_obj)
else:
- raise IllegalStateError("Dispatcher is already started.")
-
- def clean_up(self):
- """Clean up and release resources after the event dispatcher polling
- loop has been broken.
-
- The following things happen:
- 1. Clear all events and flags.
- 2. Close the sl4a client the event_dispatcher object holds.
- 3. Shut down executor without waiting.
- """
- if not self.started:
- return
- self.started = False
- self.clear_all_events()
- # At this point, the sl4a apk is destroyed and nothing is listening on
- # the socket. Avoid sending any sl4a commands; just clean up the socket
- # and return.
- self._sl4a.disconnect()
- self.poller.set_result("Done")
- # The polling thread is guaranteed to finish after a max of 60 seconds,
- # so we don't wait here.
- self.executor.shutdown(wait=False)
-
- def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
- """Pop an event from its queue.
-
- Return and remove the oldest entry of an event.
- Block until an event of specified name is available or
- times out if timeout is set.
-
- Args:
- event_name: Name of the event to be popped.
- timeout: Number of seconds to wait when event is not present.
- Never times out if None.
-
- Returns:
- The oldest entry of the specified event. None if timed out.
-
- Raises:
- IllegalStateError: Raised if pop is called before the dispatcher
- starts polling.
- """
- if not self.started:
- raise IllegalStateError(
- "Dispatcher needs to be started before popping.")
-
- e_queue = self.get_event_q(event_name)
-
- if not e_queue:
- raise TypeError(
- "Failed to get an event queue for {}".format(event_name))
-
- try:
- # Block for timeout
- if timeout:
- return e_queue.get(True, timeout)
- # Non-blocking poll for event
- elif timeout == 0:
- return e_queue.get(False)
- else:
- # Block forever on event wait
- return e_queue.get(True)
- except queue.Empty:
- raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
- timeout, event_name))
-
- def wait_for_event(self,
- event_name,
- predicate,
- timeout=DEFAULT_TIMEOUT,
- *args,
- **kwargs):
- """Wait for an event that satisfies a predicate to appear.
-
- Continuously pop events of a particular name and check against the
- predicate until an event that satisfies the predicate is popped or
- timed out. Note this will remove all the events of the same name that
- do not satisfy the predicate in the process.
-
- Args:
- event_name: Name of the event to be popped.
- predicate: A function that takes an event and returns True if the
- predicate is satisfied, False otherwise.
- timeout: Number of seconds to wait.
- *args: Optional positional args passed to predicate().
- **kwargs: Optional keyword args passed to predicate().
-
- Returns:
- The event that satisfies the predicate.
-
- Raises:
- queue.Empty: Raised if no event that satisfies the predicate was
- found before time out.
- """
- deadline = time.time() + timeout
-
- while True:
- event = None
- try:
- event = self.pop_event(event_name, 1)
- except queue.Empty:
- pass
-
- if event and predicate(event, *args, **kwargs):
- return event
-
- if time.time() > deadline:
- raise queue.Empty(
- 'Timeout after {}s waiting for event: {}'.format(
- timeout, event_name))
-
- def pop_events(self, regex_pattern, timeout):
- """Pop events whose names match a regex pattern.
-
- If such event(s) exist, pop one event from each event queue that
- satisfies the condition. Otherwise, wait for an event that satisfies
- the condition to occur, with timeout.
-
- Results are sorted by timestamp in ascending order.
-
- Args:
- regex_pattern: The regular expression pattern that an event name
- should match in order to be popped.
- timeout: Number of seconds to wait for events in case no event
- matching the condition exits when the function is called.
-
- Returns:
- Events whose names match a regex pattern.
- Empty if none exist and the wait timed out.
-
- Raises:
- IllegalStateError: Raised if pop is called before the dispatcher
- starts polling.
- queue.Empty: Raised if no event was found before time out.
- """
- if not self.started:
- raise IllegalStateError(
- "Dispatcher needs to be started before popping.")
- deadline = time.time() + timeout
- while True:
- #TODO: fix the sleep loop
- results = self._match_and_pop(regex_pattern)
- if len(results) != 0 or time.time() > deadline:
- break
- time.sleep(1)
- if len(results) == 0:
- raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
- timeout, regex_pattern))
-
- return sorted(results, key=lambda event: event['time'])
-
- def _match_and_pop(self, regex_pattern):
- """Pop one event from each of the event queues whose names
- match (in a sense of regular expression) regex_pattern.
- """
- results = []
- self.lock.acquire()
- for name in self.event_dict.keys():
- if re.match(regex_pattern, name):
- q = self.event_dict[name]
- if q:
- try:
- results.append(q.get(False))
- except:
- pass
- self.lock.release()
- return results
-
- def get_event_q(self, event_name):
- """Obtain the queue storing events of the specified name.
-
- If no event of this name has been polled, wait for one to.
-
- Returns:
- A queue storing all the events of the specified name.
- None if timed out.
-
- Raises:
- queue.Empty: Raised if the queue does not exist and timeout has
- passed.
- """
- self.lock.acquire()
- if not event_name in self.event_dict or self.event_dict[
- event_name] is None:
- self.event_dict[event_name] = queue.Queue()
+ q = queue.Queue()
+ q.put(event_obj)
+ self.event_dict[event_name] = q
self.lock.release()
- event_queue = self.event_dict[event_name]
- return event_queue
+ def register_handler(self, handler, event_name, args):
+ """Registers an event handler.
- def handle_subscribed_event(self, event_obj, event_name):
- """Execute the registered handler of an event.
+ One type of event can only have one event handler associated with it.
- Retrieve the handler and its arguments, and execute the handler in a
- new thread.
+ Args:
+ handler: The event handler function to be registered.
+ event_name: Name of the event the handler is for.
+ args: User arguments to be passed to the handler when it's called.
- Args:
- event_obj: Json object of the event.
- event_name: Name of the event to call handler for.
- """
- handler, args = self.handlers[event_name]
- self.executor.submit(handler, event_obj, *args)
+ Raises:
+ IllegalStateError: Raised if attempts to register a handler after
+ the dispatcher starts running.
+ DuplicateError: Raised if attempts to register more than one
+ handler for one type of event.
+ """
+ if self.started:
+ raise IllegalStateError(("Can't register service after polling is"
+ " started"))
+ self.lock.acquire()
+ try:
+ if event_name in self.handlers:
+ raise DuplicateError(
+ 'A handler for {} already exists'.format(event_name))
+ self.handlers[event_name] = (handler, args)
+ finally:
+ self.lock.release()
- def _handle(self, event_handler, event_name, user_args, event_timeout,
- cond, cond_timeout):
- """Pop an event of specified type and calls its handler on it. If
- condition is not None, block until condition is met or timeout.
- """
- if cond:
- cond.wait(cond_timeout)
- event = self.pop_event(event_name, event_timeout)
- return event_handler(event, *user_args)
+ def start(self):
+ """Starts the event dispatcher.
- def handle_event(self,
- event_handler,
- event_name,
- user_args,
- event_timeout=None,
- cond=None,
- cond_timeout=None):
- """Handle events that don't have registered handlers
+ Initiates executor and start polling events.
- In a new thread, poll one event of specified type from its queue and
- execute its handler. If no such event exists, the thread waits until
- one appears.
+ Raises:
+ IllegalStateError: Can't start a dispatcher again when it's already
+ running.
+ """
+ if not self.started:
+ self.started = True
+ self.executor = ThreadPoolExecutor(max_workers=32)
+ self.poller = self.executor.submit(self.poll_events)
+ else:
+ raise IllegalStateError("Dispatcher is already started.")
- Args:
- event_handler: Handler for the event, which should take at least
- one argument - the event json object.
- event_name: Name of the event to be handled.
- user_args: User arguments for the handler; to be passed in after
- the event json.
- event_timeout: Number of seconds to wait for the event to come.
- cond: A condition to wait on before executing the handler. Should
- be a threading.Event object.
- cond_timeout: Number of seconds to wait before the condition times
- out. Never times out if None.
+ def clean_up(self):
+ """Clean up and release resources after the event dispatcher polling
+ loop has been broken.
- Returns:
- A concurrent.Future object associated with the handler.
- If blocking call worker.result() is triggered, the handler
- needs to return something to unblock.
- """
- worker = self.executor.submit(self._handle, event_handler, event_name,
- user_args, event_timeout, cond,
- cond_timeout)
- return worker
+ The following things happen:
+ 1. Clear all events and flags.
+ 2. Close the sl4a client the event_dispatcher object holds.
+ 3. Shut down executor without waiting.
+ """
+ if not self.started:
+ return
+ self.started = False
+ self.clear_all_events()
+ # At this point, the sl4a apk is destroyed and nothing is listening on
+ # the socket. Avoid sending any sl4a commands; just clean up the socket
+ # and return.
+ self._sl4a.disconnect()
+ self.poller.set_result("Done")
+ # The polling thread is guaranteed to finish after a max of 60 seconds,
+ # so we don't wait here.
+ self.executor.shutdown(wait=False)
- def pop_all(self, event_name):
- """Return and remove all stored events of a specified name.
+ def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
+ """Pop an event from its queue.
- Pops all events from their queue. May miss the latest ones.
- If no event is available, return immediately.
+ Return and remove the oldest entry of an event.
+ Block until an event of specified name is available or
+ times out if timeout is set.
- Args:
- event_name: Name of the events to be popped.
+ Args:
+ event_name: Name of the event to be popped.
+ timeout: Number of seconds to wait when event is not present.
+ Never times out if None.
- Returns:
- List of the desired events.
+ Returns:
+ The oldest entry of the specified event. None if timed out.
- Raises:
- IllegalStateError: Raised if pop is called before the dispatcher
- starts polling.
- """
- if not self.started:
- raise IllegalStateError(("Dispatcher needs to be started before "
- "popping."))
- results = []
- try:
- self.lock.acquire()
- while True:
- e = self.event_dict[event_name].get(block=False)
- results.append(e)
- except (queue.Empty, KeyError):
- return results
- finally:
- self.lock.release()
+ Raises:
+ IllegalStateError: Raised if pop is called before the dispatcher
+ starts polling.
+ """
+ if not self.started:
+ raise IllegalStateError(
+ "Dispatcher needs to be started before popping.")
- def clear_events(self, event_name):
- """Clear all events of a particular name.
+ e_queue = self.get_event_q(event_name)
- Args:
- event_name: Name of the events to be popped.
- """
- self.lock.acquire()
- try:
- q = self.get_event_q(event_name)
- q.queue.clear()
- except queue.Empty:
- return
- finally:
- self.lock.release()
+ if not e_queue:
+ raise TypeError(
+ "Failed to get an event queue for {}".format(event_name))
- def clear_all_events(self):
- """Clear all event queues and their cached events."""
- self.lock.acquire()
- self.event_dict.clear()
- self.lock.release()
+ try:
+ # Block for timeout
+ if timeout:
+ return e_queue.get(True, timeout)
+ # Non-blocking poll for event
+ elif timeout == 0:
+ return e_queue.get(False)
+ else:
+ # Block forever on event wait
+ return e_queue.get(True)
+ except queue.Empty:
+ raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
+ timeout, event_name))
+
+ def wait_for_event(self,
+ event_name,
+ predicate,
+ timeout=DEFAULT_TIMEOUT,
+ *args,
+ **kwargs):
+ """Wait for an event that satisfies a predicate to appear.
+
+ Continuously pop events of a particular name and check against the
+ predicate until an event that satisfies the predicate is popped or
+ timed out. Note this will remove all the events of the same name that
+ do not satisfy the predicate in the process.
+
+ Args:
+ event_name: Name of the event to be popped.
+ predicate: A function that takes an event and returns True if the
+ predicate is satisfied, False otherwise.
+ timeout: Number of seconds to wait.
+ *args: Optional positional args passed to predicate().
+ **kwargs: Optional keyword args passed to predicate().
+
+ Returns:
+ The event that satisfies the predicate.
+
+ Raises:
+ queue.Empty: Raised if no event that satisfies the predicate was
+ found before time out.
+ """
+ deadline = time.time() + timeout
+
+ while True:
+ event = None
+ try:
+ event = self.pop_event(event_name, 1)
+ except queue.Empty:
+ pass
+
+ if event and predicate(event, *args, **kwargs):
+ return event
+
+ if time.time() > deadline:
+ raise queue.Empty(
+ 'Timeout after {}s waiting for event: {}'.format(
+ timeout, event_name))
+
+ def pop_events(self, regex_pattern, timeout):
+ """Pop events whose names match a regex pattern.
+
+ If such event(s) exist, pop one event from each event queue that
+ satisfies the condition. Otherwise, wait for an event that satisfies
+ the condition to occur, with timeout.
+
+ Results are sorted by timestamp in ascending order.
+
+ Args:
+ regex_pattern: The regular expression pattern that an event name
+ should match in order to be popped.
+ timeout: Number of seconds to wait for events in case no event
+ matching the condition exits when the function is called.
+
+ Returns:
+ Events whose names match a regex pattern.
+ Empty if none exist and the wait timed out.
+
+ Raises:
+ IllegalStateError: Raised if pop is called before the dispatcher
+ starts polling.
+ queue.Empty: Raised if no event was found before time out.
+ """
+ if not self.started:
+ raise IllegalStateError(
+ "Dispatcher needs to be started before popping.")
+ deadline = time.time() + timeout
+ while True:
+ #TODO: fix the sleep loop
+ results = self._match_and_pop(regex_pattern)
+ if len(results) != 0 or time.time() > deadline:
+ break
+ time.sleep(1)
+ if len(results) == 0:
+ raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
+ timeout, regex_pattern))
+
+ return sorted(results, key=lambda event: event['time'])
+
+ def _match_and_pop(self, regex_pattern):
+ """Pop one event from each of the event queues whose names
+ match (in a sense of regular expression) regex_pattern.
+ """
+ results = []
+ self.lock.acquire()
+ for name in self.event_dict.keys():
+ if re.match(regex_pattern, name):
+ q = self.event_dict[name]
+ if q:
+ try:
+ results.append(q.get(False))
+ except:
+ pass
+ self.lock.release()
+ return results
+
+ def get_event_q(self, event_name):
+ """Obtain the queue storing events of the specified name.
+
+ If no event of this name has been polled, wait for one to.
+
+ Returns:
+ A queue storing all the events of the specified name.
+ None if timed out.
+
+ Raises:
+ queue.Empty: Raised if the queue does not exist and timeout has
+ passed.
+ """
+ self.lock.acquire()
+ if not event_name in self.event_dict or self.event_dict[
+ event_name] is None:
+ self.event_dict[event_name] = queue.Queue()
+ self.lock.release()
+
+ event_queue = self.event_dict[event_name]
+ return event_queue
+
+ def handle_subscribed_event(self, event_obj, event_name):
+ """Execute the registered handler of an event.
+
+ Retrieve the handler and its arguments, and execute the handler in a
+ new thread.
+
+ Args:
+ event_obj: Json object of the event.
+ event_name: Name of the event to call handler for.
+ """
+ handler, args = self.handlers[event_name]
+ self.executor.submit(handler, event_obj, *args)
+
+ def _handle(self, event_handler, event_name, user_args, event_timeout,
+ cond, cond_timeout):
+ """Pop an event of specified type and calls its handler on it. If
+ condition is not None, block until condition is met or timeout.
+ """
+ if cond:
+ cond.wait(cond_timeout)
+ event = self.pop_event(event_name, event_timeout)
+ return event_handler(event, *user_args)
+
+ def handle_event(self,
+ event_handler,
+ event_name,
+ user_args,
+ event_timeout=None,
+ cond=None,
+ cond_timeout=None):
+ """Handle events that don't have registered handlers
+
+ In a new thread, poll one event of specified type from its queue and
+ execute its handler. If no such event exists, the thread waits until
+ one appears.
+
+ Args:
+ event_handler: Handler for the event, which should take at least
+ one argument - the event json object.
+ event_name: Name of the event to be handled.
+ user_args: User arguments for the handler; to be passed in after
+ the event json.
+ event_timeout: Number of seconds to wait for the event to come.
+ cond: A condition to wait on before executing the handler. Should
+ be a threading.Event object.
+ cond_timeout: Number of seconds to wait before the condition times
+ out. Never times out if None.
+
+ Returns:
+ A concurrent.Future object associated with the handler.
+ If blocking call worker.result() is triggered, the handler
+ needs to return something to unblock.
+ """
+ worker = self.executor.submit(self._handle, event_handler, event_name,
+ user_args, event_timeout, cond,
+ cond_timeout)
+ return worker
+
+ def pop_all(self, event_name):
+ """Return and remove all stored events of a specified name.
+
+ Pops all events from their queue. May miss the latest ones.
+ If no event is available, return immediately.
+
+ Args:
+ event_name: Name of the events to be popped.
+
+ Returns:
+ List of the desired events.
+
+ Raises:
+ IllegalStateError: Raised if pop is called before the dispatcher
+ starts polling.
+ """
+ if not self.started:
+ raise IllegalStateError(("Dispatcher needs to be started before "
+ "popping."))
+ results = []
+ try:
+ self.lock.acquire()
+ while True:
+ e = self.event_dict[event_name].get(block=False)
+ results.append(e)
+ except (queue.Empty, KeyError):
+ return results
+ finally:
+ self.lock.release()
+
+ def clear_events(self, event_name):
+ """Clear all events of a particular name.
+
+ Args:
+ event_name: Name of the events to be popped.
+ """
+ self.lock.acquire()
+ try:
+ q = self.get_event_q(event_name)
+ q.queue.clear()
+ except queue.Empty:
+ return
+ finally:
+ self.lock.release()
+
+ def clear_all_events(self):
+ """Clear all event queues and their cached events."""
+ self.lock.acquire()
+ self.event_dict.clear()
+ self.lock.release()
diff --git a/mobly/controllers/android_device_lib/fastboot.py b/mobly/controllers/android_device_lib/fastboot.py
index d62870a..0034547 100644
--- a/mobly/controllers/android_device_lib/fastboot.py
+++ b/mobly/controllers/android_device_lib/fastboot.py
@@ -16,54 +16,54 @@
def exe_cmd(*cmds):
- """Executes commands in a new shell. Directing stderr to PIPE.
+ """Executes commands in a new shell. Directing stderr to PIPE.
- This is fastboot's own exe_cmd because of its peculiar way of writing
- non-error info to stderr.
+ This is fastboot's own exe_cmd because of its peculiar way of writing
+ non-error info to stderr.
- Args:
- cmds: A sequence of commands and arguments.
+ Args:
+ cmds: A sequence of commands and arguments.
- Returns:
- The output of the command run.
+ Returns:
+ The output of the command run.
- Raises:
- Exception: An error occurred during the command execution.
- """
- cmd = ' '.join(cmds)
- proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
- (out, err) = proc.communicate()
- if not err:
- return out
- return err
+ Raises:
+ Exception: An error occurred during the command execution.
+ """
+ cmd = ' '.join(cmds)
+ proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
+ (out, err) = proc.communicate()
+ if not err:
+ return out
+ return err
class FastbootProxy():
- """Proxy class for fastboot.
+ """Proxy class for fastboot.
- For syntactic reasons, the '-' in fastboot commands need to be replaced
- with '_'. Can directly execute fastboot commands on an object:
- >> fb = FastbootProxy(<serial>)
- >> fb.devices() # will return the console output of "fastboot devices".
- """
+ For syntactic reasons, the '-' in fastboot commands need to be replaced
+ with '_'. Can directly execute fastboot commands on an object:
+ >> fb = FastbootProxy(<serial>)
+ >> fb.devices() # will return the console output of "fastboot devices".
+ """
- def __init__(self, serial=""):
- self.serial = serial
- if serial:
- self.fastboot_str = "fastboot -s {}".format(serial)
- else:
- self.fastboot_str = "fastboot"
+ def __init__(self, serial=""):
+ self.serial = serial
+ if serial:
+ self.fastboot_str = "fastboot -s {}".format(serial)
+ else:
+ self.fastboot_str = "fastboot"
- def _exec_fastboot_cmd(self, name, arg_str):
- return exe_cmd(' '.join((self.fastboot_str, name, arg_str)))
+ def _exec_fastboot_cmd(self, name, arg_str):
+ return exe_cmd(' '.join((self.fastboot_str, name, arg_str)))
- def args(self, *args):
- return exe_cmd(' '.join((self.fastboot_str, ) + args))
+ def args(self, *args):
+ return exe_cmd(' '.join((self.fastboot_str, ) + args))
- def __getattr__(self, name):
- def fastboot_call(*args):
- clean_name = name.replace('_', '-')
- arg_str = ' '.join(str(elem) for elem in args)
- return self._exec_fastboot_cmd(clean_name, arg_str)
+ def __getattr__(self, name):
+ def fastboot_call(*args):
+ clean_name = name.replace('_', '-')
+ arg_str = ' '.join(str(elem) for elem in args)
+ return self._exec_fastboot_cmd(clean_name, arg_str)
- return fastboot_call
+ return fastboot_call
diff --git a/mobly/controllers/android_device_lib/jsonrpc_shell_base.py b/mobly/controllers/android_device_lib/jsonrpc_shell_base.py
index 82fe0ac..acbcf7d 100755
--- a/mobly/controllers/android_device_lib/jsonrpc_shell_base.py
+++ b/mobly/controllers/android_device_lib/jsonrpc_shell_base.py
@@ -23,72 +23,72 @@
class Error(Exception):
- pass
+ pass
class JsonRpcShellBase(object):
- def _start_services(self, console_env):
- """Starts the services needed by this client and adds them to console_env.
+ def _start_services(self, console_env):
+ """Starts the services needed by this client and adds them to console_env.
- Must be implemented by subclasses.
- """
- raise NotImplemented()
+ Must be implemented by subclasses.
+ """
+ raise NotImplemented()
- def _get_banner(self, serial):
- """Returns the user-friendly banner message to print before the console.
+ def _get_banner(self, serial):
+ """Returns the user-friendly banner message to print before the console.
- Must be implemented by subclasses.
- """
- raise NotImplemented()
+ Must be implemented by subclasses.
+ """
+ raise NotImplemented()
- def load_device(self, serial=None):
- """Creates an AndroidDevice for the given serial number.
+ def load_device(self, serial=None):
+ """Creates an AndroidDevice for the given serial number.
- If no serial is given, it will read from the ANDROID_SERIAL
- environmental variable. If the environmental variable is not set, then
- it will read from 'adb devices' if there is only one.
- """
- serials = android_device.list_adb_devices()
- if not serials:
- raise Error('No adb device found!')
- # No serial provided, try to pick up the device automatically.
- if not serial:
- env_serial = os.environ.get('ANDROID_SERIAL', None)
- if env_serial is not None:
- serial = env_serial
- elif len(serials) == 1:
- serial = serials[0]
- else:
- raise Error(
- 'Expected one phone, but %d found. Use the -s flag or '
- 'specify ANDROID_SERIAL.' % len(serials))
- if serial not in serials:
- raise Error('Device "%s" is not found by adb.' % serial)
- ads = android_device.get_instances([serial])
- assert len(ads) == 1
- self._ad = ads[0]
+ If no serial is given, it will read from the ANDROID_SERIAL
+ environmental variable. If the environmental variable is not set, then
+ it will read from 'adb devices' if there is only one.
+ """
+ serials = android_device.list_adb_devices()
+ if not serials:
+ raise Error('No adb device found!')
+ # No serial provided, try to pick up the device automatically.
+ if not serial:
+ env_serial = os.environ.get('ANDROID_SERIAL', None)
+ if env_serial is not None:
+ serial = env_serial
+ elif len(serials) == 1:
+ serial = serials[0]
+ else:
+ raise Error(
+ 'Expected one phone, but %d found. Use the -s flag or '
+ 'specify ANDROID_SERIAL.' % len(serials))
+ if serial not in serials:
+ raise Error('Device "%s" is not found by adb.' % serial)
+ ads = android_device.get_instances([serial])
+ assert len(ads) == 1
+ self._ad = ads[0]
- def start_console(self):
- # Set up initial console environment
- console_env = {
- 'ad': self._ad,
- 'pprint': pprint.pprint,
- }
+ def start_console(self):
+ # Set up initial console environment
+ console_env = {
+ 'ad': self._ad,
+ 'pprint': pprint.pprint,
+ }
- # Start the services
- self._start_services(console_env)
+ # Start the services
+ self._start_services(console_env)
- # Start the console
- console_banner = self._get_banner(self._ad.serial)
- code.interact(banner=console_banner, local=console_env)
+ # Start the console
+ console_banner = self._get_banner(self._ad.serial)
+ code.interact(banner=console_banner, local=console_env)
- # Tear everything down
- self._ad.services.stop_all()
+ # Tear everything down
+ self._ad.services.stop_all()
- def main(self, serial=None):
- try:
- self.load_device(serial)
- except Error as e:
- print('ERROR: %s' % e, file=sys.stderr)
- sys.exit(1)
- self.start_console()
+ def main(self, serial=None):
+ try:
+ self.load_device(serial)
+ except Error as e:
+ print('ERROR: %s' % e, file=sys.stderr)
+ sys.exit(1)
+ self.start_console()
diff --git a/mobly/controllers/android_device_lib/service_manager.py b/mobly/controllers/android_device_lib/service_manager.py
index 78cc170..48f1aaf 100644
--- a/mobly/controllers/android_device_lib/service_manager.py
+++ b/mobly/controllers/android_device_lib/service_manager.py
@@ -24,236 +24,236 @@
class Error(errors.DeviceError):
- """Root error type for this module."""
+ """Root error type for this module."""
class ServiceManager(object):
- """Manager for services of AndroidDevice.
+ """Manager for services of AndroidDevice.
- A service is a long running process that involves an Android device, like
- adb logcat or Snippet.
+ A service is a long running process that involves an Android device, like
+ adb logcat or Snippet.
+ """
+
+ def __init__(self, device):
+ self._service_objects = collections.OrderedDict()
+ self._device = device
+
+ def has_service_by_name(self, name):
+ """Checks if the manager has a service registered with a specific name.
+
+ Args:
+ name: string, the name to look for.
+
+ Returns:
+ True if a service is registered with the specified name, False
+ otherwise.
"""
+ return name in self._service_objects
- def __init__(self, device):
- self._service_objects = collections.OrderedDict()
- self._device = device
+ @property
+ def is_any_alive(self):
+ """True if any service is alive; False otherwise."""
+ for service in self._service_objects.values():
+ if service.is_alive:
+ return True
+ return False
- def has_service_by_name(self, name):
- """Checks if the manager has a service registered with a specific name.
+ def register(self, alias, service_class, configs=None, start_service=True):
+ """Registers a service.
- Args:
- name: string, the name to look for.
+ This will create a service instance, starts the service, and adds the
+ instance to the mananger.
- Returns:
- True if a service is registered with the specified name, False
- otherwise.
- """
- return name in self._service_objects
+ Args:
+ alias: string, the alias for this instance.
+ service_class: class, the service class to instantiate.
+ configs: (optional) config object to pass to the service class's
+ constructor.
+ start_service: bool, whether to start the service instance or not.
+ Default is True.
+ """
+ if not inspect.isclass(service_class):
+ raise Error(self._device, '"%s" is not a class!' % service_class)
+ if not issubclass(service_class, base_service.BaseService):
+ raise Error(
+ self._device,
+ 'Class %s is not a subclass of BaseService!' % service_class)
+ if alias in self._service_objects:
+ raise Error(
+ self._device,
+ 'A service is already registered with alias "%s".' % alias)
+ service_obj = service_class(self._device, configs)
+ service_obj.alias = alias
+ if start_service:
+ service_obj.start()
+ self._service_objects[alias] = service_obj
- @property
- def is_any_alive(self):
- """True if any service is alive; False otherwise."""
- for service in self._service_objects.values():
- if service.is_alive:
- return True
- return False
+ def unregister(self, alias):
+ """Unregisters a service instance.
- def register(self, alias, service_class, configs=None, start_service=True):
- """Registers a service.
+ Stops a service and removes it from the manager.
- This will create a service instance, starts the service, and adds the
- instance to the mananger.
+ Args:
+ alias: string, the alias of the service instance to unregister.
+ """
+ if alias not in self._service_objects:
+ raise Error(self._device,
+ 'No service is registered with alias "%s".' % alias)
+ service_obj = self._service_objects.pop(alias)
+ if service_obj.is_alive:
+ with expects.expect_no_raises(
+ 'Failed to stop service instance "%s".' % alias):
+ service_obj.stop()
- Args:
- alias: string, the alias for this instance.
- service_class: class, the service class to instantiate.
- configs: (optional) config object to pass to the service class's
- constructor.
- start_service: bool, whether to start the service instance or not.
- Default is True.
- """
- if not inspect.isclass(service_class):
- raise Error(self._device, '"%s" is not a class!' % service_class)
- if not issubclass(service_class, base_service.BaseService):
- raise Error(
- self._device,
- 'Class %s is not a subclass of BaseService!' % service_class)
- if alias in self._service_objects:
- raise Error(
- self._device,
- 'A service is already registered with alias "%s".' % alias)
- service_obj = service_class(self._device, configs)
- service_obj.alias = alias
- if start_service:
- service_obj.start()
- self._service_objects[alias] = service_obj
+ def for_each(self, func):
+ """Executes a function with all registered services.
- def unregister(self, alias):
- """Unregisters a service instance.
+ Args:
+ func: function, the function to execute. This function should take
+ a service object as args.
+ """
+ aliases = list(self._service_objects.keys())
+ for alias in aliases:
+ with expects.expect_no_raises(
+ 'Failed to execute "%s" for service "%s".' %
+ (func.__name__, alias)):
+ func(self._service_objects[alias])
- Stops a service and removes it from the manager.
+ def list_live_services(self):
+ """Lists the aliases of all the services that are alive.
- Args:
- alias: string, the alias of the service instance to unregister.
- """
- if alias not in self._service_objects:
- raise Error(self._device,
- 'No service is registered with alias "%s".' % alias)
- service_obj = self._service_objects.pop(alias)
- if service_obj.is_alive:
- with expects.expect_no_raises(
- 'Failed to stop service instance "%s".' % alias):
- service_obj.stop()
+ Order of this list is determined by the order the services are
+ registered in.
- def for_each(self, func):
- """Executes a function with all registered services.
+ Returns:
+ list of strings, the aliases of the services that are running.
+ """
+ aliases = []
+ self.for_each(lambda service: aliases.append(service.alias)
+ if service.is_alive else None)
+ return aliases
- Args:
- func: function, the function to execute. This function should take
- a service object as args.
- """
- aliases = list(self._service_objects.keys())
- for alias in aliases:
- with expects.expect_no_raises(
- 'Failed to execute "%s" for service "%s".' %
- (func.__name__, alias)):
- func(self._service_objects[alias])
+ def create_output_excerpts_all(self, test_info):
+ """Creates output excerpts from all services.
- def list_live_services(self):
- """Lists the aliases of all the services that are alive.
+ This calls `create_output_excerpts` on all registered services.
- Order of this list is determined by the order the services are
- registered in.
+ Args:
+ test_info: RuntimeTestInfo, the test info associated with the scope
+ of the excerpts.
- Returns:
- list of strings, the aliases of the services that are running.
- """
- aliases = []
- self.for_each(lambda service: aliases.append(service.alias)
- if service.is_alive else None)
- return aliases
+ Returns:
+ Dict, keys are the names of the services, values are the paths to
+ the excerpt files created by the corresponding services.
+ """
+ excerpt_paths = {}
- def create_output_excerpts_all(self, test_info):
- """Creates output excerpts from all services.
+ def create_output_excerpts_for_one(service):
+ if not service.is_alive:
+ return
+ paths = service.create_output_excerpts(test_info)
+ excerpt_paths[service.alias] = paths
- This calls `create_output_excerpts` on all registered services.
+ self.for_each(create_output_excerpts_for_one)
+ return excerpt_paths
- Args:
- test_info: RuntimeTestInfo, the test info associated with the scope
- of the excerpts.
+ def unregister_all(self):
+ """Safely unregisters all active instances.
- Returns:
- Dict, keys are the names of the services, values are the paths to
- the excerpt files created by the corresponding services.
- """
- excerpt_paths = {}
+ Errors occurred here will be recorded but not raised.
+ """
+ aliases = list(self._service_objects.keys())
+ for alias in aliases:
+ self.unregister(alias)
- def create_output_excerpts_for_one(service):
- if not service.is_alive:
- return
- paths = service.create_output_excerpts(test_info)
- excerpt_paths[service.alias] = paths
+ def start_all(self):
+ """Starts all inactive service instances.
- self.for_each(create_output_excerpts_for_one)
- return excerpt_paths
+ Services will be started in the order they were registered.
+ """
+ for alias, service in self._service_objects.items():
+ if not service.is_alive:
+ with expects.expect_no_raises('Failed to start service "%s".' %
+ alias):
+ service.start()
- def unregister_all(self):
- """Safely unregisters all active instances.
+ def start_services(self, service_alises):
+ """Starts the specified services.
- Errors occurred here will be recorded but not raised.
- """
- aliases = list(self._service_objects.keys())
- for alias in aliases:
- self.unregister(alias)
+ Services will be started in the order specified by the input list.
+ No-op for services that are already running.
- def start_all(self):
- """Starts all inactive service instances.
+ Args:
+ service_alises: list of strings, the aliases of services to start.
+ """
+ for name in service_alises:
+ if name not in self._service_objects:
+ raise Error(
+ self._device,
+ 'No service is registered under the name "%s", cannot start.'
+ % name)
+ service = self._service_objects[name]
+ if not service.is_alive:
+ service.start()
- Services will be started in the order they were registered.
- """
- for alias, service in self._service_objects.items():
- if not service.is_alive:
- with expects.expect_no_raises('Failed to start service "%s".' %
- alias):
- service.start()
+ def stop_all(self):
+ """Stops all active service instances.
- def start_services(self, service_alises):
- """Starts the specified services.
+ Services will be stopped in the reverse order they were registered.
+ """
+ # OrdereDict#items does not return a sequence in Python 3.4, so we have
+ # to do a list conversion here.
+ for alias, service in reversed(list(self._service_objects.items())):
+ if service.is_alive:
+ with expects.expect_no_raises('Failed to stop service "%s".' %
+ alias):
+ service.stop()
- Services will be started in the order specified by the input list.
- No-op for services that are already running.
+ def pause_all(self):
+ """Pauses all service instances.
- Args:
- service_alises: list of strings, the aliases of services to start.
- """
- for name in service_alises:
- if name not in self._service_objects:
- raise Error(
- self._device,
- 'No service is registered under the name "%s", cannot start.'
- % name)
- service = self._service_objects[name]
- if not service.is_alive:
- service.start()
+ Services will be paused in the reverse order they were registered.
+ """
+ # OrdereDict#items does not return a sequence in Python 3.4, so we have
+ # to do a list conversion here.
+ for alias, service in reversed(list(self._service_objects.items())):
+ with expects.expect_no_raises('Failed to pause service "%s".' %
+ alias):
+ service.pause()
- def stop_all(self):
- """Stops all active service instances.
+ def resume_all(self):
+ """Resumes all service instances.
- Services will be stopped in the reverse order they were registered.
- """
- # OrdereDict#items does not return a sequence in Python 3.4, so we have
- # to do a list conversion here.
- for alias, service in reversed(list(self._service_objects.items())):
- if service.is_alive:
- with expects.expect_no_raises('Failed to stop service "%s".' %
- alias):
- service.stop()
+ Services will be resumed in the order they were registered.
+ """
+ for alias, service in self._service_objects.items():
+ with expects.expect_no_raises('Failed to resume service "%s".' %
+ alias):
+ service.resume()
- def pause_all(self):
- """Pauses all service instances.
+ def resume_services(self, service_alises):
+ """Resumes the specified services.
- Services will be paused in the reverse order they were registered.
- """
- # OrdereDict#items does not return a sequence in Python 3.4, so we have
- # to do a list conversion here.
- for alias, service in reversed(list(self._service_objects.items())):
- with expects.expect_no_raises('Failed to pause service "%s".' %
- alias):
- service.pause()
+ Services will be resumed in the order specified by the input list.
- def resume_all(self):
- """Resumes all service instances.
+ Args:
+ service_alises: list of strings, the names of services to start.
+ """
+ for name in service_alises:
+ if name not in self._service_objects:
+ raise Error(
+ self._device,
+ 'No service is registered under the name "%s", cannot resume.'
+ % name)
+ service = self._service_objects[name]
+ service.resume()
- Services will be resumed in the order they were registered.
- """
- for alias, service in self._service_objects.items():
- with expects.expect_no_raises('Failed to resume service "%s".' %
- alias):
- service.resume()
+ def __getattr__(self, name):
+ """Syntactic sugar to enable direct access of service objects by alias.
- def resume_services(self, service_alises):
- """Resumes the specified services.
-
- Services will be resumed in the order specified by the input list.
-
- Args:
- service_alises: list of strings, the names of services to start.
- """
- for name in service_alises:
- if name not in self._service_objects:
- raise Error(
- self._device,
- 'No service is registered under the name "%s", cannot resume.'
- % name)
- service = self._service_objects[name]
- service.resume()
-
- def __getattr__(self, name):
- """Syntactic sugar to enable direct access of service objects by alias.
-
- Args:
- name: string, the alias a service object was registered under.
- """
- if self.has_service_by_name(name):
- return self._service_objects[name]
- return self.__getattribute__(name)
+ Args:
+ name: string, the alias a service object was registered under.
+ """
+ if self.has_service_by_name(name):
+ return self._service_objects[name]
+ return self.__getattribute__(name)
diff --git a/mobly/controllers/android_device_lib/services/base_service.py b/mobly/controllers/android_device_lib/services/base_service.py
index c785db2..5140a6a 100644
--- a/mobly/controllers/android_device_lib/services/base_service.py
+++ b/mobly/controllers/android_device_lib/services/base_service.py
@@ -16,112 +16,112 @@
#TODO(xpconanfan): use `abc` after py2 deprecation.
class BaseService(object):
- """Base class of a Mobly AndroidDevice service.
+ """Base class of a Mobly AndroidDevice service.
- This class defines the interface for Mobly's AndroidDevice service.
+ This class defines the interface for Mobly's AndroidDevice service.
+ """
+ _alias = None
+
+ def __init__(self, device, configs=None):
+ """Constructor of the class.
+
+ The constructor is the only place to pass in a config. If you need to
+ change the config later, you should unregister the service instance
+ from `ServiceManager` and register again with the new config.
+
+ Args:
+ device: the device object this service is associated with.
+ config: optional configuration defined by the author of the service
+ class.
"""
- _alias = None
+ self._device = device
+ self._configs = configs
- def __init__(self, device, configs=None):
- """Constructor of the class.
+ @property
+ def alias(self):
+ """String, alias used to register this service with service manager.
- The constructor is the only place to pass in a config. If you need to
- change the config later, you should unregister the service instance
- from `ServiceManager` and register again with the new config.
+ This can be None if the service is never registered.
+ """
+ return self._alias
- Args:
- device: the device object this service is associated with.
- config: optional configuration defined by the author of the service
- class.
- """
- self._device = device
- self._configs = configs
+ @alias.setter
+ def alias(self, alias):
+ self._alias = alias
- @property
- def alias(self):
- """String, alias used to register this service with service manager.
+ @property
+ def is_alive(self):
+ """True if the service is active; False otherwise."""
+ raise NotImplementedError('"is_alive" is a required service property.')
- This can be None if the service is never registered.
- """
- return self._alias
+ def start(self):
+ """Starts the service."""
+ raise NotImplementedError('"start" is a required service method.')
- @alias.setter
- def alias(self, alias):
- self._alias = alias
+ def stop(self):
+ """Stops the service and cleans up all resources.
- @property
- def is_alive(self):
- """True if the service is active; False otherwise."""
- raise NotImplementedError('"is_alive" is a required service property.')
+ This method should handle any error and not throw.
+ """
+ raise NotImplementedError('"stop" is a required service method.')
- def start(self):
- """Starts the service."""
- raise NotImplementedError('"start" is a required service method.')
+ def pause(self):
+ """Pauses a service temporarily.
- def stop(self):
- """Stops the service and cleans up all resources.
+ For when the Python service object needs to temporarily lose connection
+ to the device without shutting down the service running on the actual
+ device.
- This method should handle any error and not throw.
- """
- raise NotImplementedError('"stop" is a required service method.')
+ This is relevant when a service needs to maintain a constant connection
+ to the device and the connection is lost if USB connection to the
+ device is disrupted.
- def pause(self):
- """Pauses a service temporarily.
+ E.g. a services that utilizes a socket connection over adb port
+ forwarding would need to implement this for the situation where the USB
+ connection to the device will be temporarily cut, but the device is not
+ rebooted.
- For when the Python service object needs to temporarily lose connection
- to the device without shutting down the service running on the actual
- device.
+ For more context, see:
+ `mobly.controllers.android_device.AndroidDevice.handle_usb_disconnect`
- This is relevant when a service needs to maintain a constant connection
- to the device and the connection is lost if USB connection to the
- device is disrupted.
+ If not implemented, we assume the service is not sensitive to device
+ disconnect, and `stop` will be called by default.
+ """
+ self.stop()
- E.g. a services that utilizes a socket connection over adb port
- forwarding would need to implement this for the situation where the USB
- connection to the device will be temporarily cut, but the device is not
- rebooted.
+ def resume(self):
+ """Resumes a paused service.
- For more context, see:
- `mobly.controllers.android_device.AndroidDevice.handle_usb_disconnect`
+ Same context as the `pause` method. This should resume the service
+ after the connection to the device has been re-established.
- If not implemented, we assume the service is not sensitive to device
- disconnect, and `stop` will be called by default.
- """
- self.stop()
+ If not implemented, we assume the service is not sensitive to device
+ disconnect, and `start` will be called by default.
+ """
+ self.start()
- def resume(self):
- """Resumes a paused service.
+ def create_output_excerpts(self, test_info):
+ """Creates excerpts of the service's output files.
- Same context as the `pause` method. This should resume the service
- after the connection to the device has been re-established.
+ [Optional] This method only applies to services with output files.
- If not implemented, we assume the service is not sensitive to device
- disconnect, and `start` will be called by default.
- """
- self.start()
+ For services that generates output files, calling this method would
+ create excerpts of the output files. An excerpt should contain info
+ between two calls of `create_output_excerpts` or from the start of the
+ service to the call to `create_output_excerpts`.
- def create_output_excerpts(self, test_info):
- """Creates excerpts of the service's output files.
+ Use `AndroidDevice#generate_filename` to get the proper filenames for
+ excerpts.
- [Optional] This method only applies to services with output files.
+ This is usually called at the end of: `setup_class`, `teardown_test`,
+ or `teardown_class`.
- For services that generates output files, calling this method would
- create excerpts of the output files. An excerpt should contain info
- between two calls of `create_output_excerpts` or from the start of the
- service to the call to `create_output_excerpts`.
+ Args:
+ test_info: RuntimeTestInfo, the test info associated with the scope
+ of the excerpts.
- Use `AndroidDevice#generate_filename` to get the proper filenames for
- excerpts.
-
- This is usually called at the end of: `setup_class`, `teardown_test`,
- or `teardown_class`.
-
- Args:
- test_info: RuntimeTestInfo, the test info associated with the scope
- of the excerpts.
-
- Returns:
- List of strings, the absolute paths to the excerpt files created.
- Empty list if no excerpt files are created.
- """
- return []
+ Returns:
+ List of strings, the absolute paths to the excerpt files created.
+ Empty list if no excerpt files are created.
+ """
+ return []
diff --git a/mobly/controllers/android_device_lib/services/sl4a_service.py b/mobly/controllers/android_device_lib/services/sl4a_service.py
index 6560444..d5c5128 100644
--- a/mobly/controllers/android_device_lib/services/sl4a_service.py
+++ b/mobly/controllers/android_device_lib/services/sl4a_service.py
@@ -18,43 +18,43 @@
class Sl4aService(base_service.BaseService):
- """Service for managing sl4a's client.
+ """Service for managing sl4a's client.
- Direct calls on the service object will forwarded to the client object as
- syntactic sugar. So `Sl4aService.doFoo()` is equivalent to
- `Sl4aClient.doFoo()`.
- """
+ Direct calls on the service object will forwarded to the client object as
+ syntactic sugar. So `Sl4aService.doFoo()` is equivalent to
+ `Sl4aClient.doFoo()`.
+ """
- def __init__(self, device, configs=None):
- del configs # Never used.
- self._ad = device
- self._sl4a_client = None
+ def __init__(self, device, configs=None):
+ del configs # Never used.
+ self._ad = device
+ self._sl4a_client = None
- @property
- def is_alive(self):
- return self._sl4a_client is not None
+ @property
+ def is_alive(self):
+ return self._sl4a_client is not None
- def start(self):
- self._sl4a_client = sl4a_client.Sl4aClient(ad=self._ad)
- self._sl4a_client.start_app_and_connect()
+ def start(self):
+ self._sl4a_client = sl4a_client.Sl4aClient(ad=self._ad)
+ self._sl4a_client.start_app_and_connect()
- def stop(self):
- if self.is_alive:
- self._sl4a_client.stop_app()
- self._sl4a_client = None
+ def stop(self):
+ if self.is_alive:
+ self._sl4a_client.stop_app()
+ self._sl4a_client = None
- def pause(self):
- # Need to stop dispatcher because it continuously polls the device.
- # It's not necessary to stop the sl4a client.
- self._sl4a_client.stop_event_dispatcher()
- self._sl4a_client.clear_host_port()
+ def pause(self):
+ # Need to stop dispatcher because it continuously polls the device.
+ # It's not necessary to stop the sl4a client.
+ self._sl4a_client.stop_event_dispatcher()
+ self._sl4a_client.clear_host_port()
- def resume(self):
- # Restore sl4a if needed.
- self._sl4a_client.restore_app_connection()
+ def resume(self):
+ # Restore sl4a if needed.
+ self._sl4a_client.restore_app_connection()
- def __getattr__(self, name):
- """Forwards the getattr calls to the client itself."""
- if self._sl4a_client:
- return getattr(self._sl4a_client, name)
- return self.__getattribute__(name)
+ def __getattr__(self, name):
+ """Forwards the getattr calls to the client itself."""
+ if self._sl4a_client:
+ return getattr(self._sl4a_client, name)
+ return self.__getattribute__(name)
diff --git a/mobly/controllers/android_device_lib/services/snippet_management_service.py b/mobly/controllers/android_device_lib/services/snippet_management_service.py
index 11358cc..5c60b25 100644
--- a/mobly/controllers/android_device_lib/services/snippet_management_service.py
+++ b/mobly/controllers/android_device_lib/services/snippet_management_service.py
@@ -20,137 +20,137 @@
class Error(errors.ServiceError):
- """Root error type for snippet management service."""
- SERVICE_TYPE = 'SnippetManagementService'
+ """Root error type for snippet management service."""
+ SERVICE_TYPE = 'SnippetManagementService'
class SnippetManagementService(base_service.BaseService):
- """Management service of snippet clients.
+ """Management service of snippet clients.
- This service manages all the snippet clients associated with an Android
- device.
+ This service manages all the snippet clients associated with an Android
+ device.
+ """
+
+ def __init__(self, device, configs=None):
+ del configs # Unused param.
+ self._device = device
+ self._is_alive = False
+ self._snippet_clients = {}
+ super(SnippetManagementService, self).__init__(device)
+
+ @property
+ def is_alive(self):
+ """True if any client is running, False otherwise."""
+ return any(
+ [client.is_alive for client in self._snippet_clients.values()])
+
+ def get_snippet_client(self, name):
+ """Gets the snippet client managed under a given name.
+
+ Args:
+ name: string, the name of the snippet client under management.
+
+ Returns:
+ SnippetClient.
"""
+ if name in self._snippet_clients:
+ return self._snippet_clients[name]
- def __init__(self, device, configs=None):
- del configs # Unused param.
- self._device = device
- self._is_alive = False
- self._snippet_clients = {}
- super(SnippetManagementService, self).__init__(device)
+ def add_snippet_client(self, name, package):
+ """Adds a snippet client to the management.
- @property
- def is_alive(self):
- """True if any client is running, False otherwise."""
- return any(
- [client.is_alive for client in self._snippet_clients.values()])
+ Args:
+ name: string, the attribute name to which to attach the snippet
+ client. E.g. `name='maps'` attaches the snippet client to
+ `ad.maps`.
+ package: string, the package name of the snippet apk to connect to.
- def get_snippet_client(self, name):
- """Gets the snippet client managed under a given name.
+ Raises:
+ Error, if a duplicated name or package is passed in.
+ """
+ # Should not load snippet with the same name more than once.
+ if name in self._snippet_clients:
+ raise Error(
+ self,
+ 'Name "%s" is already registered with package "%s", it cannot '
+ 'be used again.' %
+ (name, self._snippet_clients[name].client.package))
+ # Should not load the same snippet package more than once.
+ for snippet_name, client in self._snippet_clients.items():
+ if package == client.package:
+ raise Error(
+ self,
+ 'Snippet package "%s" has already been loaded under name'
+ ' "%s".' % (package, snippet_name))
+ client = snippet_client.SnippetClient(package=package, ad=self._device)
+ client.start_app_and_connect()
+ self._snippet_clients[name] = client
- Args:
- name: string, the name of the snippet client under management.
+ def remove_snippet_client(self, name):
+ """Removes a snippet client from management.
- Returns:
- SnippetClient.
- """
- if name in self._snippet_clients:
- return self._snippet_clients[name]
+ Args:
+ name: string, the name of the snippet client to remove.
- def add_snippet_client(self, name, package):
- """Adds a snippet client to the management.
+ Raises:
+ Error: if no snippet client is managed under the specified name.
+ """
+ if name not in self._snippet_clients:
+ raise Error(self._device, MISSING_SNIPPET_CLIENT_MSG % name)
+ client = self._snippet_clients.pop(name)
+ client.stop_app()
- Args:
- name: string, the attribute name to which to attach the snippet
- client. E.g. `name='maps'` attaches the snippet client to
- `ad.maps`.
- package: string, the package name of the snippet apk to connect to.
-
- Raises:
- Error, if a duplicated name or package is passed in.
- """
- # Should not load snippet with the same name more than once.
- if name in self._snippet_clients:
- raise Error(
- self,
- 'Name "%s" is already registered with package "%s", it cannot '
- 'be used again.' %
- (name, self._snippet_clients[name].client.package))
- # Should not load the same snippet package more than once.
- for snippet_name, client in self._snippet_clients.items():
- if package == client.package:
- raise Error(
- self,
- 'Snippet package "%s" has already been loaded under name'
- ' "%s".' % (package, snippet_name))
- client = snippet_client.SnippetClient(package=package, ad=self._device)
+ def start(self):
+ """Starts all the snippet clients under management."""
+ for client in self._snippet_clients.values():
+ if not client.is_alive:
+ self._device.log.debug('Starting SnippetClient<%s>.',
+ client.package)
client.start_app_and_connect()
- self._snippet_clients[name] = client
+ else:
+ self._device.log.debug(
+ 'Not startng SnippetClient<%s> because it is already alive.',
+ client.package)
- def remove_snippet_client(self, name):
- """Removes a snippet client from management.
-
- Args:
- name: string, the name of the snippet client to remove.
-
- Raises:
- Error: if no snippet client is managed under the specified name.
- """
- if name not in self._snippet_clients:
- raise Error(self._device, MISSING_SNIPPET_CLIENT_MSG % name)
- client = self._snippet_clients.pop(name)
+ def stop(self):
+ """Stops all the snippet clients under management."""
+ for client in self._snippet_clients.values():
+ if client.is_alive:
+ self._device.log.debug('Stopping SnippetClient<%s>.',
+ client.package)
client.stop_app()
+ else:
+ self._device.log.debug(
+ 'Not stopping SnippetClient<%s> because it is not alive.',
+ client.package)
- def start(self):
- """Starts all the snippet clients under management."""
- for client in self._snippet_clients.values():
- if not client.is_alive:
- self._device.log.debug('Starting SnippetClient<%s>.',
- client.package)
- client.start_app_and_connect()
- else:
- self._device.log.debug(
- 'Not startng SnippetClient<%s> because it is already alive.',
- client.package)
+ def pause(self):
+ """Pauses all the snippet clients under management.
- def stop(self):
- """Stops all the snippet clients under management."""
- for client in self._snippet_clients.values():
- if client.is_alive:
- self._device.log.debug('Stopping SnippetClient<%s>.',
- client.package)
- client.stop_app()
- else:
- self._device.log.debug(
- 'Not stopping SnippetClient<%s> because it is not alive.',
- client.package)
+ This clears the host port of a client because a new port will be
+ allocated in `resume`.
+ """
+ for client in self._snippet_clients.values():
+ self._device.log.debug(
+ 'Clearing host port %d of SnippetClient<%s>.',
+ client.host_port, client.package)
+ client.clear_host_port()
- def pause(self):
- """Pauses all the snippet clients under management.
+ def resume(self):
+ """Resumes all paused snippet clients."""
+ for client in self._snippet_clients.values():
+ # Resume is only applicable if a client is alive and does not have
+ # a host port.
+ if client.is_alive and client.host_port is None:
+ self._device.log.debug('Resuming SnippetClient<%s>.',
+ client.package)
+ client.restore_app_connection()
+ else:
+ self._device.log.debug('Not resuming SnippetClient<%s>.',
+ client.package)
- This clears the host port of a client because a new port will be
- allocated in `resume`.
- """
- for client in self._snippet_clients.values():
- self._device.log.debug(
- 'Clearing host port %d of SnippetClient<%s>.',
- client.host_port, client.package)
- client.clear_host_port()
-
- def resume(self):
- """Resumes all paused snippet clients."""
- for client in self._snippet_clients.values():
- # Resume is only applicable if a client is alive and does not have
- # a host port.
- if client.is_alive and client.host_port is None:
- self._device.log.debug('Resuming SnippetClient<%s>.',
- client.package)
- client.restore_app_connection()
- else:
- self._device.log.debug('Not resuming SnippetClient<%s>.',
- client.package)
-
- def __getattr__(self, name):
- client = self.get_snippet_client(name)
- if client:
- return client
- return self.__getattribute__(name)
+ def __getattr__(self, name):
+ client = self.get_snippet_client(name)
+ if client:
+ return client
+ return self.__getattribute__(name)
diff --git a/mobly/controllers/android_device_lib/sl4a_client.py b/mobly/controllers/android_device_lib/sl4a_client.py
index 7b168d9..dd79553 100644
--- a/mobly/controllers/android_device_lib/sl4a_client.py
+++ b/mobly/controllers/android_device_lib/sl4a_client.py
@@ -22,9 +22,9 @@
_APP_NAME = 'SL4A'
_DEVICE_SIDE_PORT = 8080
_LAUNCH_CMD = (
- 'am start -a com.googlecode.android_scripting.action.LAUNCH_SERVER '
- '--ei com.googlecode.android_scripting.extra.USE_SERVICE_PORT %s '
- 'com.googlecode.android_scripting/.activity.ScriptingLayerServiceLauncher')
+ 'am start -a com.googlecode.android_scripting.action.LAUNCH_SERVER '
+ '--ei com.googlecode.android_scripting.extra.USE_SERVICE_PORT %s '
+ 'com.googlecode.android_scripting/.activity.ScriptingLayerServiceLauncher')
# Maximum time to wait for the app to start on the device (10 minutes).
# TODO: This timeout is set high in order to allow for retries in
# start_app_and_connect. Decrease it when the call to connect() has the option
@@ -36,132 +36,132 @@
class Sl4aClient(jsonrpc_client_base.JsonRpcClientBase):
- """A client for interacting with SL4A using Mobly Snippet Lib.
+ """A client for interacting with SL4A using Mobly Snippet Lib.
- Extra public attributes:
- ed: Event dispatcher instance for this sl4a client.
+ Extra public attributes:
+ ed: Event dispatcher instance for this sl4a client.
+ """
+
+ def __init__(self, ad):
+ """Initializes an Sl4aClient.
+
+ Args:
+ ad: AndroidDevice object.
"""
+ super(Sl4aClient, self).__init__(app_name=_APP_NAME, ad=ad)
+ self._ad = ad
+ self.ed = None
+ self._adb = ad.adb
- def __init__(self, ad):
- """Initializes an Sl4aClient.
+ def start_app_and_connect(self):
+ """Overrides superclass."""
+ # Check that sl4a is installed
+ out = self._adb.shell('pm list package')
+ if not utils.grep('com.googlecode.android_scripting', out):
+ raise jsonrpc_client_base.AppStartError(
+ self._ad,
+ '%s is not installed on %s' % (_APP_NAME, self._adb.serial))
+ self.disable_hidden_api_blacklist()
- Args:
- ad: AndroidDevice object.
- """
- super(Sl4aClient, self).__init__(app_name=_APP_NAME, ad=ad)
- self._ad = ad
- self.ed = None
- self._adb = ad.adb
+ # sl4a has problems connecting after disconnection, so kill the apk and
+ # try connecting again.
+ try:
+ self.stop_app()
+ except Exception as e:
+ self.log.warning(e)
- def start_app_and_connect(self):
- """Overrides superclass."""
- # Check that sl4a is installed
- out = self._adb.shell('pm list package')
- if not utils.grep('com.googlecode.android_scripting', out):
- raise jsonrpc_client_base.AppStartError(
- self._ad,
- '%s is not installed on %s' % (_APP_NAME, self._adb.serial))
- self.disable_hidden_api_blacklist()
+ # Launch the app
+ self.device_port = _DEVICE_SIDE_PORT
+ self._adb.shell(_LAUNCH_CMD % self.device_port)
- # sl4a has problems connecting after disconnection, so kill the apk and
- # try connecting again.
+ # Try to start the connection (not restore the connectivity).
+ # The function name restore_app_connection is used here is for the
+ # purpose of reusing the same code as it does when restoring the
+ # connection. And we do not want to come up with another function
+ # name to complicate the API. Change the name if necessary.
+ self.restore_app_connection()
+
+ def restore_app_connection(self, port=None):
+ """Restores the sl4a after device got disconnected.
+
+ Instead of creating new instance of the client:
+ - Uses the given port (or find a new available host_port if none is
+ given).
+ - Tries to connect to remote server with selected port.
+
+ Args:
+ port: If given, this is the host port from which to connect to remote
+ device port. If not provided, find a new available port as host
+ port.
+
+ Raises:
+ AppRestoreConnectionError: When the app was not able to be started.
+ """
+ self.host_port = port or utils.get_available_host_port()
+ self._retry_connect()
+ self.ed = self._start_event_client()
+
+ def stop_app(self):
+ """Overrides superclass."""
+ try:
+ if self._conn:
+ # Be polite; let the dest know we're shutting down.
try:
- self.stop_app()
- except Exception as e:
- self.log.warning(e)
+ self.closeSl4aSession()
+ except:
+ self.log.exception('Failed to gracefully shut down %s.',
+ self.app_name)
- # Launch the app
- self.device_port = _DEVICE_SIDE_PORT
- self._adb.shell(_LAUNCH_CMD % self.device_port)
+ # Close the socket connection.
+ self.disconnect()
+ self.stop_event_dispatcher()
- # Try to start the connection (not restore the connectivity).
- # The function name restore_app_connection is used here is for the
- # purpose of reusing the same code as it does when restoring the
- # connection. And we do not want to come up with another function
- # name to complicate the API. Change the name if necessary.
- self.restore_app_connection()
+ # Terminate the app
+ self._adb.shell('am force-stop com.googlecode.android_scripting')
+ finally:
+ # Always clean up the adb port
+ self.clear_host_port()
- def restore_app_connection(self, port=None):
- """Restores the sl4a after device got disconnected.
+ def stop_event_dispatcher(self):
+ # Close Event Dispatcher
+ if self.ed:
+ try:
+ self.ed.clean_up()
+ except:
+ self.log.exception('Failed to shutdown sl4a event dispatcher.')
+ self.ed = None
- Instead of creating new instance of the client:
- - Uses the given port (or find a new available host_port if none is
- given).
- - Tries to connect to remote server with selected port.
+ def _retry_connect(self):
+ self._adb.forward(
+ ['tcp:%d' % self.host_port,
+ 'tcp:%d' % self.device_port])
+ start_time = time.time()
+ expiration_time = start_time + _APP_START_WAIT_TIME
+ started = False
+ while time.time() < expiration_time:
+ self.log.debug('Attempting to start %s.', self.app_name)
+ try:
+ self.connect()
+ started = True
+ break
+ except:
+ self.log.debug('%s is not yet running, retrying',
+ self.app_name,
+ exc_info=True)
+ time.sleep(1)
+ if not started:
+ raise jsonrpc_client_base.AppRestoreConnectionError(
+ self._ad, '%s failed to connect for %s at host port %s, '
+ 'device port %s' % (self.app_name, self._adb.serial,
+ self.host_port, self.device_port))
- Args:
- port: If given, this is the host port from which to connect to remote
- device port. If not provided, find a new available port as host
- port.
-
- Raises:
- AppRestoreConnectionError: When the app was not able to be started.
- """
- self.host_port = port or utils.get_available_host_port()
- self._retry_connect()
- self.ed = self._start_event_client()
-
- def stop_app(self):
- """Overrides superclass."""
- try:
- if self._conn:
- # Be polite; let the dest know we're shutting down.
- try:
- self.closeSl4aSession()
- except:
- self.log.exception('Failed to gracefully shut down %s.',
- self.app_name)
-
- # Close the socket connection.
- self.disconnect()
- self.stop_event_dispatcher()
-
- # Terminate the app
- self._adb.shell('am force-stop com.googlecode.android_scripting')
- finally:
- # Always clean up the adb port
- self.clear_host_port()
-
- def stop_event_dispatcher(self):
- # Close Event Dispatcher
- if self.ed:
- try:
- self.ed.clean_up()
- except:
- self.log.exception('Failed to shutdown sl4a event dispatcher.')
- self.ed = None
-
- def _retry_connect(self):
- self._adb.forward(
- ['tcp:%d' % self.host_port,
- 'tcp:%d' % self.device_port])
- start_time = time.time()
- expiration_time = start_time + _APP_START_WAIT_TIME
- started = False
- while time.time() < expiration_time:
- self.log.debug('Attempting to start %s.', self.app_name)
- try:
- self.connect()
- started = True
- break
- except:
- self.log.debug('%s is not yet running, retrying',
- self.app_name,
- exc_info=True)
- time.sleep(1)
- if not started:
- raise jsonrpc_client_base.AppRestoreConnectionError(
- self._ad, '%s failed to connect for %s at host port %s, '
- 'device port %s' % (self.app_name, self._adb.serial,
- self.host_port, self.device_port))
-
- def _start_event_client(self):
- # Start an EventDispatcher for the current sl4a session
- event_client = Sl4aClient(self._ad)
- event_client.host_port = self.host_port
- event_client.device_port = self.device_port
- event_client.connect(uid=self.uid,
- cmd=jsonrpc_client_base.JsonRpcCommand.CONTINUE)
- ed = event_dispatcher.EventDispatcher(event_client)
- ed.start()
- return ed
+ def _start_event_client(self):
+ # Start an EventDispatcher for the current sl4a session
+ event_client = Sl4aClient(self._ad)
+ event_client.host_port = self.host_port
+ event_client.device_port = self.device_port
+ event_client.connect(uid=self.uid,
+ cmd=jsonrpc_client_base.JsonRpcCommand.CONTINUE)
+ ed = event_dispatcher.EventDispatcher(event_client)
+ ed.start()
+ return ed
diff --git a/mobly/controllers/android_device_lib/snippet_client.py b/mobly/controllers/android_device_lib/snippet_client.py
index 735a921..aded347 100644
--- a/mobly/controllers/android_device_lib/snippet_client.py
+++ b/mobly/controllers/android_device_lib/snippet_client.py
@@ -24,7 +24,7 @@
from mobly.controllers.android_device_lib import jsonrpc_client_base
_INSTRUMENTATION_RUNNER_PACKAGE = (
- 'com.google.android.mobly.snippet.SnippetRunner')
+ 'com.google.android.mobly.snippet.SnippetRunner')
# Major version of the launch and communication protocol being used by this
# client.
@@ -40,11 +40,11 @@
_PROTOCOL_MINOR_VERSION = 0
_LAUNCH_CMD = (
- '{shell_cmd} am instrument {user} -w -e action start {snippet_package}/' +
- _INSTRUMENTATION_RUNNER_PACKAGE)
+ '{shell_cmd} am instrument {user} -w -e action start {snippet_package}/' +
+ _INSTRUMENTATION_RUNNER_PACKAGE)
_STOP_CMD = ('am instrument {user} -w -e action stop {snippet_package}/' +
- _INSTRUMENTATION_RUNNER_PACKAGE)
+ _INSTRUMENTATION_RUNNER_PACKAGE)
# Test that uses UiAutomation requires the shell session to be maintained while
# test is in progress. However, this requirement does not hold for the test that
@@ -60,327 +60,327 @@
class AppStartPreCheckError(jsonrpc_client_base.Error):
- """Raised when pre checks for the snippet failed."""
+ """Raised when pre checks for the snippet failed."""
class ProtocolVersionError(jsonrpc_client_base.AppStartError):
- """Raised when the protocol reported by the snippet is unknown."""
+ """Raised when the protocol reported by the snippet is unknown."""
class SnippetClient(jsonrpc_client_base.JsonRpcClientBase):
- """A client for interacting with snippet APKs using Mobly Snippet Lib.
+ """A client for interacting with snippet APKs using Mobly Snippet Lib.
- See superclass documentation for a list of public attributes.
+ See superclass documentation for a list of public attributes.
- For a description of the launch protocols, see the documentation in
- mobly-snippet-lib, SnippetRunner.java.
+ For a description of the launch protocols, see the documentation in
+ mobly-snippet-lib, SnippetRunner.java.
+ """
+
+ def __init__(self, package, ad):
+ """Initializes a SnippetClient.
+
+ Args:
+ package: (str) The package name of the apk where the snippets are
+ defined.
+ ad: (AndroidDevice) the device object associated with this client.
"""
+ super(SnippetClient, self).__init__(app_name=package, ad=ad)
+ self.package = package
+ self._ad = ad
+ self._adb = ad.adb
+ self._proc = None
- def __init__(self, package, ad):
- """Initializes a SnippetClient.
+ @property
+ def is_alive(self):
+ """Is the client alive.
- Args:
- package: (str) The package name of the apk where the snippets are
- defined.
- ad: (AndroidDevice) the device object associated with this client.
- """
- super(SnippetClient, self).__init__(app_name=package, ad=ad)
- self.package = package
- self._ad = ad
- self._adb = ad.adb
- self._proc = None
+ The client is considered alive if there is a connection object held for
+ it. This is an approximation due to the following scenario:
- @property
- def is_alive(self):
- """Is the client alive.
+ In the USB disconnect case, the host subprocess that kicked off the
+ snippet apk would die, but the snippet apk itself would continue
+ running on the device.
- The client is considered alive if there is a connection object held for
- it. This is an approximation due to the following scenario:
+ The best approximation we can make is, the connection object has not
+ been explicitly torn down, so the client should be considered alive.
- In the USB disconnect case, the host subprocess that kicked off the
- snippet apk would die, but the snippet apk itself would continue
- running on the device.
+ Returns:
+ True if the client is considered alive, False otherwise.
+ """
+ return self._conn is not None
- The best approximation we can make is, the connection object has not
- been explicitly torn down, so the client should be considered alive.
+ def _get_user_command_string(self):
+ """Gets the appropriate command argument for specifying user IDs.
- Returns:
- True if the client is considered alive, False otherwise.
- """
- return self._conn is not None
+ By default, `SnippetClient` operates within the current user.
- def _get_user_command_string(self):
- """Gets the appropriate command argument for specifying user IDs.
+ We don't add the `--user {ID}` arg when Android's SDK is below 24,
+ where multi-user support is not well implemented.
- By default, `SnippetClient` operates within the current user.
+ Returns:
+ String, the command param section to be formatted into the adb
+ commands.
+ """
+ sdk_int = int(self._ad.build_info['build_version_sdk'])
+ if sdk_int < 24:
+ return ''
+ return '--user %s' % self._adb.current_user_id
- We don't add the `--user {ID}` arg when Android's SDK is below 24,
- where multi-user support is not well implemented.
+ def start_app_and_connect(self):
+ """Starts snippet apk on the device and connects to it.
- Returns:
- String, the command param section to be formatted into the adb
- commands.
- """
- sdk_int = int(self._ad.build_info['build_version_sdk'])
- if sdk_int < 24:
- return ''
- return '--user %s' % self._adb.current_user_id
+ This wraps the main logic with safe handling
- def start_app_and_connect(self):
- """Starts snippet apk on the device and connects to it.
+ Raises:
+ AppStartPreCheckError, when pre-launch checks fail.
+ """
+ try:
+ self._start_app_and_connect()
+ except AppStartPreCheckError:
+ # Precheck errors don't need cleanup, directly raise.
+ raise
+ except Exception as e:
+ # Log the stacktrace of `e` as re-raising doesn't preserve trace.
+ self._ad.log.exception('Failed to start app and connect.')
+ # If errors happen, make sure we clean up before raising.
+ try:
+ self.stop_app()
+ except:
+ self._ad.log.exception(
+ 'Failed to stop app after failure to start and connect.')
+ # Explicitly raise the original error from starting app.
+ raise e
- This wraps the main logic with safe handling
+ def _start_app_and_connect(self):
+ """Starts snippet apk on the device and connects to it.
- Raises:
- AppStartPreCheckError, when pre-launch checks fail.
- """
- try:
- self._start_app_and_connect()
- except AppStartPreCheckError:
- # Precheck errors don't need cleanup, directly raise.
- raise
- except Exception as e:
- # Log the stacktrace of `e` as re-raising doesn't preserve trace.
- self._ad.log.exception('Failed to start app and connect.')
- # If errors happen, make sure we clean up before raising.
- try:
- self.stop_app()
- except:
- self._ad.log.exception(
- 'Failed to stop app after failure to start and connect.')
- # Explicitly raise the original error from starting app.
- raise e
+ After prechecks, this launches the snippet apk with an adb cmd in a
+ standing subprocess, checks the cmd response from the apk for protocol
+ version, then sets up the socket connection over adb port-forwarding.
- def _start_app_and_connect(self):
- """Starts snippet apk on the device and connects to it.
+ Args:
+ ProtocolVersionError, if protocol info or port info cannot be
+ retrieved from the snippet apk.
+ """
+ self._check_app_installed()
+ self.disable_hidden_api_blacklist()
- After prechecks, this launches the snippet apk with an adb cmd in a
- standing subprocess, checks the cmd response from the apk for protocol
- version, then sets up the socket connection over adb port-forwarding.
+ persists_shell_cmd = self._get_persist_command()
+ # Use info here so people can follow along with the snippet startup
+ # process. Starting snippets can be slow, especially if there are
+ # multiple, and this avoids the perception that the framework is hanging
+ # for a long time doing nothing.
+ self.log.info('Launching snippet apk %s with protocol %d.%d',
+ self.package, _PROTOCOL_MAJOR_VERSION,
+ _PROTOCOL_MINOR_VERSION)
+ cmd = _LAUNCH_CMD.format(shell_cmd=persists_shell_cmd,
+ user=self._get_user_command_string(),
+ snippet_package=self.package)
+ start_time = time.time()
+ self._proc = self._do_start_app(cmd)
- Args:
- ProtocolVersionError, if protocol info or port info cannot be
- retrieved from the snippet apk.
- """
- self._check_app_installed()
- self.disable_hidden_api_blacklist()
+ # Check protocol version and get the device port
+ line = self._read_protocol_line()
+ match = re.match('^SNIPPET START, PROTOCOL ([0-9]+) ([0-9]+)$', line)
+ if not match or match.group(1) != '1':
+ raise ProtocolVersionError(self._ad, line)
- persists_shell_cmd = self._get_persist_command()
- # Use info here so people can follow along with the snippet startup
- # process. Starting snippets can be slow, especially if there are
- # multiple, and this avoids the perception that the framework is hanging
- # for a long time doing nothing.
- self.log.info('Launching snippet apk %s with protocol %d.%d',
- self.package, _PROTOCOL_MAJOR_VERSION,
- _PROTOCOL_MINOR_VERSION)
- cmd = _LAUNCH_CMD.format(shell_cmd=persists_shell_cmd,
- user=self._get_user_command_string(),
- snippet_package=self.package)
- start_time = time.time()
- self._proc = self._do_start_app(cmd)
+ line = self._read_protocol_line()
+ match = re.match('^SNIPPET SERVING, PORT ([0-9]+)$', line)
+ if not match:
+ raise ProtocolVersionError(self._ad, line)
+ self.device_port = int(match.group(1))
- # Check protocol version and get the device port
- line = self._read_protocol_line()
- match = re.match('^SNIPPET START, PROTOCOL ([0-9]+) ([0-9]+)$', line)
- if not match or match.group(1) != '1':
- raise ProtocolVersionError(self._ad, line)
+ # Forward the device port to a new host port, and connect to that port
+ self.host_port = utils.get_available_host_port()
+ self._adb.forward(
+ ['tcp:%d' % self.host_port,
+ 'tcp:%d' % self.device_port])
+ self.connect()
- line = self._read_protocol_line()
- match = re.match('^SNIPPET SERVING, PORT ([0-9]+)$', line)
- if not match:
- raise ProtocolVersionError(self._ad, line)
- self.device_port = int(match.group(1))
+ # Yaaay! We're done!
+ self.log.debug('Snippet %s started after %.1fs on host port %s',
+ self.package,
+ time.time() - start_time, self.host_port)
- # Forward the device port to a new host port, and connect to that port
- self.host_port = utils.get_available_host_port()
- self._adb.forward(
- ['tcp:%d' % self.host_port,
- 'tcp:%d' % self.device_port])
- self.connect()
+ def restore_app_connection(self, port=None):
+ """Restores the app after device got reconnected.
- # Yaaay! We're done!
- self.log.debug('Snippet %s started after %.1fs on host port %s',
- self.package,
- time.time() - start_time, self.host_port)
+ Instead of creating new instance of the client:
+ - Uses the given port (or find a new available host_port if none is
+ given).
+ - Tries to connect to remote server with selected port.
- def restore_app_connection(self, port=None):
- """Restores the app after device got reconnected.
+ Args:
+ port: If given, this is the host port from which to connect to remote
+ device port. If not provided, find a new available port as host
+ port.
- Instead of creating new instance of the client:
- - Uses the given port (or find a new available host_port if none is
- given).
- - Tries to connect to remote server with selected port.
+ Raises:
+ AppRestoreConnectionError: When the app was not able to be started.
+ """
+ self.host_port = port or utils.get_available_host_port()
+ self._adb.forward(
+ ['tcp:%d' % self.host_port,
+ 'tcp:%d' % self.device_port])
+ try:
+ self.connect()
+ except:
+ # Log the original error and raise AppRestoreConnectionError.
+ self.log.exception('Failed to re-connect to app.')
+ raise jsonrpc_client_base.AppRestoreConnectionError(
+ self._ad,
+ ('Failed to restore app connection for %s at host port %s, '
+ 'device port %s') %
+ (self.package, self.host_port, self.device_port))
- Args:
- port: If given, this is the host port from which to connect to remote
- device port. If not provided, find a new available port as host
- port.
+ # Because the previous connection was lost, update self._proc
+ self._proc = None
+ self._restore_event_client()
- Raises:
- AppRestoreConnectionError: When the app was not able to be started.
- """
- self.host_port = port or utils.get_available_host_port()
- self._adb.forward(
- ['tcp:%d' % self.host_port,
- 'tcp:%d' % self.device_port])
- try:
- self.connect()
- except:
- # Log the original error and raise AppRestoreConnectionError.
- self.log.exception('Failed to re-connect to app.')
- raise jsonrpc_client_base.AppRestoreConnectionError(
- self._ad,
- ('Failed to restore app connection for %s at host port %s, '
- 'device port %s') %
- (self.package, self.host_port, self.device_port))
+ def stop_app(self):
+ # Kill the pending 'adb shell am instrument -w' process if there is one.
+ # Although killing the snippet apk would abort this process anyway, we
+ # want to call stop_standing_subprocess() to perform a health check,
+ # print the failure stack trace if there was any, and reap it from the
+ # process table.
+ self.log.debug('Stopping snippet apk %s', self.package)
+ try:
+ # Close the socket connection.
+ self.disconnect()
+ if self._proc:
+ utils.stop_standing_subprocess(self._proc)
+ self._proc = None
+ out = self._adb.shell(
+ _STOP_CMD.format(
+ snippet_package=self.package,
+ user=self._get_user_command_string())).decode('utf-8')
+ if 'OK (0 tests)' not in out:
+ raise errors.DeviceError(
+ self._ad,
+ 'Failed to stop existing apk. Unexpected output: %s' % out)
+ finally:
+ # Always clean up the adb port
+ self.clear_host_port()
- # Because the previous connection was lost, update self._proc
- self._proc = None
- self._restore_event_client()
+ def _start_event_client(self):
+ """Overrides superclass."""
+ event_client = SnippetClient(package=self.package, ad=self._ad)
+ event_client.host_port = self.host_port
+ event_client.device_port = self.device_port
+ event_client.connect(self.uid,
+ jsonrpc_client_base.JsonRpcCommand.CONTINUE)
+ return event_client
- def stop_app(self):
- # Kill the pending 'adb shell am instrument -w' process if there is one.
- # Although killing the snippet apk would abort this process anyway, we
- # want to call stop_standing_subprocess() to perform a health check,
- # print the failure stack trace if there was any, and reap it from the
- # process table.
- self.log.debug('Stopping snippet apk %s', self.package)
- try:
- # Close the socket connection.
- self.disconnect()
- if self._proc:
- utils.stop_standing_subprocess(self._proc)
- self._proc = None
- out = self._adb.shell(
- _STOP_CMD.format(
- snippet_package=self.package,
- user=self._get_user_command_string())).decode('utf-8')
- if 'OK (0 tests)' not in out:
- raise errors.DeviceError(
- self._ad,
- 'Failed to stop existing apk. Unexpected output: %s' % out)
- finally:
- # Always clean up the adb port
- self.clear_host_port()
+ def _restore_event_client(self):
+ """Restores previously created event client."""
+ if not self._event_client:
+ self._event_client = self._start_event_client()
+ return
+ self._event_client.host_port = self.host_port
+ self._event_client.device_port = self.device_port
+ self._event_client.connect()
- def _start_event_client(self):
- """Overrides superclass."""
- event_client = SnippetClient(package=self.package, ad=self._ad)
- event_client.host_port = self.host_port
- event_client.device_port = self.device_port
- event_client.connect(self.uid,
- jsonrpc_client_base.JsonRpcCommand.CONTINUE)
- return event_client
+ def _check_app_installed(self):
+ # Check that the Mobly Snippet app is installed for the current user.
+ user_id = self._adb.current_user_id
+ out = self._adb.shell('pm list package --user %s' % user_id)
+ if not utils.grep('^package:%s$' % self.package, out):
+ raise AppStartPreCheckError(
+ self._ad,
+ '%s is not installed for user %s.' % (self.package, user_id))
+ # Check that the app is instrumented.
+ out = self._adb.shell('pm list instrumentation')
+ matched_out = utils.grep(
+ '^instrumentation:%s/%s' %
+ (self.package, _INSTRUMENTATION_RUNNER_PACKAGE), out)
+ if not matched_out:
+ raise AppStartPreCheckError(
+ self._ad,
+ '%s is installed, but it is not instrumented.' % self.package)
+ match = re.search(r'^instrumentation:(.*)\/(.*) \(target=(.*)\)$',
+ matched_out[0])
+ target_name = match.group(3)
+ # Check that the instrumentation target is installed if it's not the
+ # same as the snippet package.
+ if target_name != self.package:
+ out = self._adb.shell('pm list package --user %s' % user_id)
+ if not utils.grep('^package:%s$' % target_name, out):
+ raise AppStartPreCheckError(
+ self._ad,
+ 'Instrumentation target %s is not installed for user %s.' %
+ (target_name, user_id))
- def _restore_event_client(self):
- """Restores previously created event client."""
- if not self._event_client:
- self._event_client = self._start_event_client()
- return
- self._event_client.host_port = self.host_port
- self._event_client.device_port = self.device_port
- self._event_client.connect()
+ def _do_start_app(self, launch_cmd):
+ adb_cmd = [adb.ADB]
+ if self._adb.serial:
+ adb_cmd += ['-s', self._adb.serial]
+ adb_cmd += ['shell', launch_cmd]
+ return utils.start_standing_subprocess(adb_cmd, shell=False)
- def _check_app_installed(self):
- # Check that the Mobly Snippet app is installed for the current user.
- user_id = self._adb.current_user_id
- out = self._adb.shell('pm list package --user %s' % user_id)
- if not utils.grep('^package:%s$' % self.package, out):
- raise AppStartPreCheckError(
- self._ad,
- '%s is not installed for user %s.' % (self.package, user_id))
- # Check that the app is instrumented.
- out = self._adb.shell('pm list instrumentation')
- matched_out = utils.grep(
- '^instrumentation:%s/%s' %
- (self.package, _INSTRUMENTATION_RUNNER_PACKAGE), out)
- if not matched_out:
- raise AppStartPreCheckError(
- self._ad,
- '%s is installed, but it is not instrumented.' % self.package)
- match = re.search(r'^instrumentation:(.*)\/(.*) \(target=(.*)\)$',
- matched_out[0])
- target_name = match.group(3)
- # Check that the instrumentation target is installed if it's not the
- # same as the snippet package.
- if target_name != self.package:
- out = self._adb.shell('pm list package --user %s' % user_id)
- if not utils.grep('^package:%s$' % target_name, out):
- raise AppStartPreCheckError(
- self._ad,
- 'Instrumentation target %s is not installed for user %s.' %
- (target_name, user_id))
+ def _read_protocol_line(self):
+ """Reads the next line of instrumentation output relevant to snippets.
- def _do_start_app(self, launch_cmd):
- adb_cmd = [adb.ADB]
- if self._adb.serial:
- adb_cmd += ['-s', self._adb.serial]
- adb_cmd += ['shell', launch_cmd]
- return utils.start_standing_subprocess(adb_cmd, shell=False)
+ This method will skip over lines that don't start with 'SNIPPET' or
+ 'INSTRUMENTATION_RESULT'.
- def _read_protocol_line(self):
- """Reads the next line of instrumentation output relevant to snippets.
+ Returns:
+ (str) Next line of snippet-related instrumentation output, stripped.
- This method will skip over lines that don't start with 'SNIPPET' or
- 'INSTRUMENTATION_RESULT'.
+ Raises:
+ jsonrpc_client_base.AppStartError: If EOF is reached without any
+ protocol lines being read.
+ """
+ while True:
+ line = self._proc.stdout.readline().decode('utf-8')
+ if not line:
+ raise jsonrpc_client_base.AppStartError(
+ self._ad, 'Unexpected EOF waiting for app to start')
+ # readline() uses an empty string to mark EOF, and a single newline
+ # to mark regular empty lines in the output. Don't move the strip()
+ # call above the truthiness check, or this method will start
+ # considering any blank output line to be EOF.
+ line = line.strip()
+ if (line.startswith('INSTRUMENTATION_RESULT:')
+ or line.startswith('SNIPPET ')):
+ self.log.debug(
+ 'Accepted line from instrumentation output: "%s"', line)
+ return line
+ self.log.debug('Discarded line from instrumentation output: "%s"',
+ line)
- Returns:
- (str) Next line of snippet-related instrumentation output, stripped.
+ def _get_persist_command(self):
+ """Check availability and return path of command if available."""
+ for command in [_SETSID_COMMAND, _NOHUP_COMMAND]:
+ try:
+ if command in self._adb.shell(['which',
+ command]).decode('utf-8'):
+ return command
+ except adb.AdbError:
+ continue
+ self.log.warning(
+ 'No %s and %s commands available to launch instrument '
+ 'persistently, tests that depend on UiAutomator and '
+ 'at the same time performs USB disconnection may fail',
+ _SETSID_COMMAND, _NOHUP_COMMAND)
+ return ''
- Raises:
- jsonrpc_client_base.AppStartError: If EOF is reached without any
- protocol lines being read.
- """
- while True:
- line = self._proc.stdout.readline().decode('utf-8')
- if not line:
- raise jsonrpc_client_base.AppStartError(
- self._ad, 'Unexpected EOF waiting for app to start')
- # readline() uses an empty string to mark EOF, and a single newline
- # to mark regular empty lines in the output. Don't move the strip()
- # call above the truthiness check, or this method will start
- # considering any blank output line to be EOF.
- line = line.strip()
- if (line.startswith('INSTRUMENTATION_RESULT:')
- or line.startswith('SNIPPET ')):
- self.log.debug(
- 'Accepted line from instrumentation output: "%s"', line)
- return line
- self.log.debug('Discarded line from instrumentation output: "%s"',
- line)
+ def help(self, print_output=True):
+ """Calls the help RPC, which returns the list of RPC calls available.
- def _get_persist_command(self):
- """Check availability and return path of command if available."""
- for command in [_SETSID_COMMAND, _NOHUP_COMMAND]:
- try:
- if command in self._adb.shell(['which',
- command]).decode('utf-8'):
- return command
- except adb.AdbError:
- continue
- self.log.warning(
- 'No %s and %s commands available to launch instrument '
- 'persistently, tests that depend on UiAutomator and '
- 'at the same time performs USB disconnection may fail',
- _SETSID_COMMAND, _NOHUP_COMMAND)
- return ''
+ This RPC should normally be used in an interactive console environment
+ where the output should be printed instead of returned. Otherwise,
+ newlines will be escaped, which will make the output difficult to read.
- def help(self, print_output=True):
- """Calls the help RPC, which returns the list of RPC calls available.
+ Args:
+ print_output: A bool for whether the output should be printed.
- This RPC should normally be used in an interactive console environment
- where the output should be printed instead of returned. Otherwise,
- newlines will be escaped, which will make the output difficult to read.
-
- Args:
- print_output: A bool for whether the output should be printed.
-
- Returns:
- A str containing the help output otherwise None if print_output
- wasn't set.
- """
- help_text = self._rpc('help')
- if print_output:
- print(help_text)
- else:
- return help_text
+ Returns:
+ A str containing the help output otherwise None if print_output
+ wasn't set.
+ """
+ help_text = self._rpc('help')
+ if print_output:
+ print(help_text)
+ else:
+ return help_text
diff --git a/mobly/controllers/android_device_lib/snippet_event.py b/mobly/controllers/android_device_lib/snippet_event.py
index c717765..9a87215 100644
--- a/mobly/controllers/android_device_lib/snippet_event.py
+++ b/mobly/controllers/android_device_lib/snippet_event.py
@@ -14,38 +14,38 @@
def from_dict(event_dict):
- """Create a SnippetEvent object from a dictionary.
+ """Create a SnippetEvent object from a dictionary.
- Args:
- event_dict: a dictionary representing an event.
+ Args:
+ event_dict: a dictionary representing an event.
- Returns:
- A SnippetEvent object.
- """
- return SnippetEvent(callback_id=event_dict['callbackId'],
- name=event_dict['name'],
- creation_time=event_dict['time'],
- data=event_dict['data'])
+ Returns:
+ A SnippetEvent object.
+ """
+ return SnippetEvent(callback_id=event_dict['callbackId'],
+ name=event_dict['name'],
+ creation_time=event_dict['time'],
+ data=event_dict['data'])
class SnippetEvent(object):
- """The class that represents callback events for mobly snippet library.
+ """The class that represents callback events for mobly snippet library.
- Attributes:
- callback_id: string, the callback ID associated with the event.
- name: string, the name of the event.
- creation_time: int, the epoch time when the event is created on the
- Rpc server side.
- data: dictionary, the data held by the event. Can be None.
- """
+ Attributes:
+ callback_id: string, the callback ID associated with the event.
+ name: string, the name of the event.
+ creation_time: int, the epoch time when the event is created on the
+ Rpc server side.
+ data: dictionary, the data held by the event. Can be None.
+ """
- def __init__(self, callback_id, name, creation_time, data):
- self.callback_id = callback_id
- self.name = name
- self.creation_time = creation_time
- self.data = data
+ def __init__(self, callback_id, name, creation_time, data):
+ self.callback_id = callback_id
+ self.name = name
+ self.creation_time = creation_time
+ self.data = data
- def __repr__(self):
- return ('SnippetEvent(callback_id: %s, name: %s, creation_time: %s, '
- 'data: %s)') % (self.callback_id, self.name,
- self.creation_time, self.data)
+ def __repr__(self):
+ return ('SnippetEvent(callback_id: %s, name: %s, creation_time: %s, '
+ 'data: %s)') % (self.callback_id, self.name,
+ self.creation_time, self.data)