blob: 646833e691c7ddcd0aac82cfc0b9991bd33a2183 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""SystemPowerStateController affordance implementation using sysfs."""
import enum
import logging
import os
import pty
import re
import subprocess
import time
from typing import Any
from honeydew import errors
from honeydew.interfaces.affordances import (
system_power_state_controller as system_power_state_controller_interface,
)
from honeydew.interfaces.device_classes import affordances_capable
from honeydew.transports import ffx as ffx_transport
from honeydew.typing import custom_types
class _StarnixCmds:
"""Class to hold Starnix commands."""
PREFIX: list[str] = [
"starnix",
"console",
"/bin/sh",
"-c",
]
IDLE_SUSPEND: list[str] = [
"echo -n mem > /sys/power/state",
]
IS_STARNIX_SUPPORTED: list[str] = [
"echo hello",
]
class _Timeouts(enum.IntEnum):
"""Class to hold the timeouts."""
STARNIX_CMD = 15
FFX_LOGS_CMD = 60
class _RegExPatterns:
"""Class to hold Regular Expression patterns."""
STARNIX_CMD_SUCCESS: re.Pattern[str] = re.compile(r"(exit code: 0)")
STARNIX_NOT_SUPPORTED: re.Pattern[str] = re.compile(
r"Unable to find Starnix container in the session"
)
SUSPEND_OPERATION: re.Pattern[str] = re.compile(
r"\[(\d+).\d+\].*?\[system-activity-governor\].+?Suspending"
)
RESUME_OPERATION: re.Pattern[str] = re.compile(
r"\[(\d+?).\d+?\].*?\[system-activity-governor\].+?Resuming.+?Ok"
)
HONEYDEW_SUSPEND_RESUME_START: re.Pattern[str] = re.compile(
r"\[lacewing\].*?\[Host Time: (.*?)\].*?Performing.*?Suspend.*?followed by.*?Resume.*?operations"
)
HONEYDEW_SUSPEND_RESUME_END: re.Pattern[str] = re.compile(
r"\[lacewing\].*?\[Host Time: (.*?)\].*?Completed.*?Suspend.*?followed by.*?Resume.*?operations.*?in (\d+?.\d+?) seconds"
)
SUSPEND_RESUME_PATTERNS: list[re.Pattern[str]] = [
HONEYDEW_SUSPEND_RESUME_START,
SUSPEND_OPERATION,
RESUME_OPERATION,
HONEYDEW_SUSPEND_RESUME_END,
]
_MAX_READ_SIZE: int = 1024
_LOGGER: logging.Logger = logging.getLogger(__name__)
class SystemPowerStateController(
system_power_state_controller_interface.SystemPowerStateController
):
"""SystemPowerStateController affordance implementation using sysfs.
Args:
device_name: Device name returned by `ffx target list`.
ffx: FFX transport.
Raises:
errors.NotSupportedError: If Fuchsia device does not support Starnix.
"""
def __init__(
self,
device_name: str,
ffx: ffx_transport.FFX,
device_logger: affordances_capable.FuchsiaDeviceLogger,
) -> None:
self._device_name: str = device_name
self._ffx: ffx_transport.FFX = ffx
self._device_logger: affordances_capable.FuchsiaDeviceLogger = (
device_logger
)
_LOGGER.debug(
"Checking if %s supports %s affordance...",
self._device_name,
self.__class__.__name__,
)
self._run_starnix_console_shell_cmd(
cmd=_StarnixCmds.IS_STARNIX_SUPPORTED
)
_LOGGER.debug(
"%s does support %s affordance...",
self._device_name,
self.__class__.__name__,
)
# List all the public methods
def suspend_resume(
self,
suspend_state: system_power_state_controller_interface.SuspendState,
resume_mode: system_power_state_controller_interface.ResumeMode,
verify: bool = True,
) -> None:
"""Perform suspend-resume operation on the device.
This is a synchronous operation on the device and thus this call will be
hanged until resume operation finishes.
Args:
suspend_state: Which state to suspend the Fuchsia device into.
resume_mode: Information about how to resume the device.
verify: Whether or not to verify if suspend-resume operation
performed successfully. Optional and default is True. Note that
this raises SystemPowerStateControllerError if verification
fails.
Raises:
errors.SystemPowerStateControllerError: In case of failure
errors.NotSupportedError: If any of the suspend_state or resume_type
is not yet supported
"""
logs_start_time: float = time.time()
log_message: str = (
f"Performing '{suspend_state}' followed by '{resume_mode}' "
f"operations on '{self._device_name}'..."
)
_LOGGER.info(log_message)
self._device_logger.log_message_to_device(
message=log_message,
level=custom_types.LEVEL.INFO,
)
suspend_resume_start_time: float = time.time()
if isinstance(
resume_mode, system_power_state_controller_interface.AutomaticResume
):
pass
# Device will resume automatically
else:
raise errors.NotSupportedError(
f"Resuming the device using '{resume_mode}' is not yet supported."
)
if isinstance(
suspend_state, system_power_state_controller_interface.IdleSuspend
):
self._perform_idle_suspend()
else:
raise errors.NotSupportedError(
f"Suspending the device to '{suspend_state}' state is not yet "
f"supported."
)
suspend_resume_end_time: float = time.time()
suspend_resume_duration: float = (
suspend_resume_end_time - suspend_resume_start_time
)
log_message = (
f"Completed '{suspend_state}' followed by '{resume_mode}' "
f"operations on '{self._device_name}' in {suspend_resume_duration} "
f"seconds."
)
self._device_logger.log_message_to_device(
message=log_message,
level=custom_types.LEVEL.INFO,
)
_LOGGER.info(log_message)
logs_end_time: float = time.time()
logs_duration: float = logs_end_time - logs_start_time
if verify:
self._verify_suspend_resume(
suspend_state,
resume_mode,
suspend_resume_duration,
logs_duration,
)
def idle_suspend_auto_resume(self, verify: bool = True) -> None:
"""Perform idle-suspend and auto-resume operation on the device.
Args:
verify: Whether or not to verify if suspend-resume operation
performed successfully. Optional and default is True.
Raises:
errors.SystemPowerStateControllerError: In case of failure
"""
self.suspend_resume(
suspend_state=system_power_state_controller_interface.IdleSuspend(),
resume_mode=system_power_state_controller_interface.AutomaticResume(),
verify=verify,
)
# List all the private methods
def _perform_idle_suspend(self) -> None:
"""Perform Idle mode suspend operation.
Raises:
errors.SystemPowerStateControllerError: In case of failure.
"""
try:
self._run_starnix_console_shell_cmd(
cmd=_StarnixCmds.IDLE_SUSPEND, timeout=None
)
except Exception as err: # pylint: disable=broad-except
raise errors.SystemPowerStateControllerError(
f"Failed to put {self._device_name} into idle-suspend mode"
) from err
def _run_starnix_console_shell_cmd(
self, cmd: list[str], timeout: float | None = _Timeouts.STARNIX_CMD
) -> str:
"""Run a starnix console command and return its output.
Args:
cmd: cmd that need to be run excluding `starnix /bin/sh -c`.
timeout: command timeout.
Returns:
Output of `ffx -t {target} starnix /bin/sh -c {cmd}`.
Raises:
errors.StarnixError: In case of starnix command failure.
errors.NotSupportedError: If Fuchsia device does not support Starnix.
subprocess.TimeoutExpired: In case of command timeout.
"""
# starnix console requires the process to run in tty:
host_fd: int
child_fd: int
host_fd, child_fd = pty.openpty()
starnix_cmd: list[str] = _StarnixCmds.PREFIX + cmd
starnix_cmd_str: str = " ".join(starnix_cmd)
process: subprocess.Popen[Any] = self._ffx.popen(
cmd=starnix_cmd,
stdin=child_fd,
stdout=child_fd,
stderr=child_fd,
)
process.wait(timeout)
# Note: This call may sometime return less chars than _MAX_READ_SIZE
# even when command output contains more chars. This happened with
# `getprop` command output but not with suspend-resume related
# operations. So consider exploring better ways to read command output
# such that this method can be used with other starnix console commands
output: str = os.read(host_fd, _MAX_READ_SIZE).decode("utf-8")
_LOGGER.debug(
"Starnix console cmd `%s` completed. returncode=%s, output:\n%s",
starnix_cmd_str,
process.returncode,
output,
)
if _RegExPatterns.STARNIX_CMD_SUCCESS.search(output):
return output
elif _RegExPatterns.STARNIX_NOT_SUPPORTED.search(output):
board: str | None = None
product: str | None = None
try:
board = self._ffx.get_target_board()
product = self._ffx.get_target_product()
except Exception: # pylint: disable=broad-except
pass
error_msg: str
if board and product:
error_msg = (
f"{self._device_name} running {product}.{board} does not "
f"support Starnix"
)
else:
error_msg = f"{self._device_name} does not support Starnix"
raise errors.NotSupportedError(error_msg)
else:
raise errors.StarnixError(
f"Starnix console cmd `{starnix_cmd_str}` failed. (See debug "
"logs for command output)"
)
def _verify_suspend_resume(
self,
suspend_state: system_power_state_controller_interface.SuspendState,
resume_mode: system_power_state_controller_interface.ResumeMode,
suspend_resume_duration: float,
logs_duration: float,
) -> None:
"""Verifies suspend resume operation has been indeed performed
correctly.
Args:
suspend_state: Which state to suspend the Fuchsia device into.
resume_mode: Information about how to resume the device.
suspend_resume_duration: How long suspend-resume operation took.
logs_duration: How many seconds of logs need to be captured for
the log analysis.
Raises:
errors.SystemPowerStateControllerError: In case of verification
failure.
"""
_LOGGER.info(
"Verifying the '%s' followed by '%s' operations that were "
"performed on '%s'...",
suspend_state,
resume_mode,
self._device_name,
)
self._verify_suspend_resume_using_duration(
suspend_state=suspend_state,
resume_mode=resume_mode,
suspend_resume_duration=suspend_resume_duration,
)
# TODO (https://fxbug.dev/335494603): Use inspect based verification
self._verify_suspend_resume_using_log_analysis(
suspend_state=suspend_state,
resume_mode=resume_mode,
logs_duration=logs_duration,
)
_LOGGER.info(
"Successfully verified the '%s' followed by '%s' operations that "
"were performed on '%s'",
suspend_state,
resume_mode,
self._device_name,
)
def _verify_suspend_resume_using_duration(
self,
suspend_state: system_power_state_controller_interface.SuspendState,
resume_mode: system_power_state_controller_interface.ResumeMode,
suspend_resume_duration: float,
) -> None:
"""Verify that suspend-resume operation was indeed triggered by checking
the duration it took to perform suspend-resume operation.
Args:
suspend_state: Which state to suspend the Fuchsia device into.
resume_mode: Information about how to resume the device.
suspend_resume_duration: How long suspend-resume operation took.
Raises:
errors.SystemPowerStateControllerError: In case of verification
failure.
"""
if isinstance(
resume_mode, system_power_state_controller_interface.AutomaticResume
):
buffer_duration: float = 3
max_expected_duration: float = (
resume_mode.duration + buffer_duration
)
if (
suspend_resume_duration < resume_mode.duration
or suspend_resume_duration > max_expected_duration
):
raise errors.SystemPowerStateControllerError(
f"'{suspend_state}' followed by '{resume_mode}' operation "
f"took {suspend_resume_duration} seconds on "
f"'{self._device_name}'. Expected duration "
f"range: [{resume_mode.duration}, {max_expected_duration}] "
f"seconds.",
)
def _verify_suspend_resume_using_log_analysis(
self,
suspend_state: system_power_state_controller_interface.SuspendState,
resume_mode: system_power_state_controller_interface.ResumeMode,
logs_duration: float,
) -> None:
"""Verify that suspend resume operation was indeed triggered using log
analysis.
Args:
suspend_state: Which state to suspend the Fuchsia device into.
resume_mode: Information about how to resume the device.
logs_duration: How many seconds of logs need to be captured for
the log analysis.
"""
duration: int = round(logs_duration) + 1
suspend_resume_logs_cmd: list[str] = [
"log",
"--symbolize",
"off", # Turn off symbolize to run this cmd in infra
"--filter",
"remote-control", # Lacewing logs
"--filter",
"system-activity-governor", # suspend-resume logs
"--since",
f"{duration}s ago",
"dump",
]
suspend_resume_logs: list[str] = self._ffx.run(
suspend_resume_logs_cmd,
timeout=_Timeouts.FFX_LOGS_CMD,
).split("\n")
suspend_time: int = 0
resume_time: int = 0
expected_patterns: list[
re.Pattern[str]
] = _RegExPatterns.SUSPEND_RESUME_PATTERNS.copy()
for suspend_resume_log in suspend_resume_logs:
reg_ex: re.Pattern[str] = expected_patterns[0]
match: re.Match[str] | None = reg_ex.search(suspend_resume_log)
if match:
if reg_ex == _RegExPatterns.SUSPEND_OPERATION:
suspend_time = int(match.group(1))
elif reg_ex == _RegExPatterns.RESUME_OPERATION:
resume_time = int(match.group(1))
expected_patterns.remove(reg_ex)
if not expected_patterns:
break
reg_ex = expected_patterns[0]
if expected_patterns:
raise errors.SystemPowerStateControllerError(
f"Log analysis for '{suspend_state}' followed by "
f"'{resume_mode}' operation failed on '{self._device_name}'. "
f"Following patterns were not found: {expected_patterns}"
)
suspend_resume_duration: int = resume_time - suspend_time
self._verify_suspend_resume_using_duration(
suspend_state=suspend_state,
resume_mode=resume_mode,
suspend_resume_duration=suspend_resume_duration,
)