blob: 90030f0e61c63b1a56d5a0b04e443927ca4ea13b [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(http://b/259746643): Remove this file once we no longer rely on antlion for
# flashing the device. This should be the responsibility of the person or software
# dispatching antlion; removing flashing from antlion increases opportunities for
# runtime optimization and increases device lifetime.
import logging
import os
import tarfile
import tempfile
import time
from antlion import utils
from antlion.libs.proc import job
from antlion.utils import get_fuchsia_mdns_ipv6_address
MDNS_LOOKUP_RETRY_MAX = 3
FASTBOOT_TIMEOUT = 30
FLASH_TIMEOUT_SEC = 60 * 5 # 5 minutes
AFTER_FLASH_BOOT_TIME = 30
WAIT_FOR_EXISTING_FLASH_TO_FINISH_SEC = 360
PROCESS_CHECK_WAIT_TIME_SEC = 30
FUCHSIA_SDK_URL = "gs://fuchsia-sdk/development"
FUCHSIA_RELEASE_TESTING_URL = "gs://fuchsia-release-testing/images"
def flash(fuchsia_device, use_ssh=False, fuchsia_reconnect_after_reboot_time=5):
"""A function to flash, not pave, a fuchsia_device
Args:
fuchsia_device: An ACTS fuchsia_device
Returns:
True if successful.
"""
if not fuchsia_device.authorized_file:
raise ValueError(
"A ssh authorized_file must be present in the "
"ACTS config to flash fuchsia_devices."
)
# This is the product type from the fx set command.
# Do 'fx list-products' to see options in Fuchsia source tree.
if not fuchsia_device.product_type:
raise ValueError(
"A product type must be specified to flash " "fuchsia_devices."
)
# This is the board type from the fx set command.
# Do 'fx list-boards' to see options in Fuchsia source tree.
if not fuchsia_device.board_type:
raise ValueError("A board type must be specified to flash " "fuchsia_devices.")
if not fuchsia_device.build_number:
fuchsia_device.build_number = "LATEST"
if not fuchsia_device.mdns_name:
raise ValueError(
"Either fuchsia_device mdns_name must be specified or "
"ip must be the mDNS name to be able to flash."
)
file_to_download = None
image_archive_path = None
image_path = None
if not fuchsia_device.specific_image:
product_build = fuchsia_device.product_type
if fuchsia_device.build_type:
product_build = f"{product_build}_{fuchsia_device.build_type}"
if "LATEST" in fuchsia_device.build_number:
sdk_version = "sdk"
if "LATEST_F" in fuchsia_device.build_number:
f_branch = fuchsia_device.build_number.split("LATEST_F", 1)[1]
sdk_version = f"f{f_branch}_sdk"
file_to_download = (
f"{FUCHSIA_RELEASE_TESTING_URL}/"
f"{sdk_version}-{product_build}.{fuchsia_device.board_type}-release.tgz"
)
else:
# Must be a fully qualified build number (e.g. 5.20210721.4.1215)
file_to_download = (
f"{FUCHSIA_SDK_URL}/{fuchsia_device.build_number}/images/"
f"{product_build}.{fuchsia_device.board_type}-release.tgz"
)
elif "gs://" in fuchsia_device.specific_image:
file_to_download = fuchsia_device.specific_image
elif os.path.isdir(fuchsia_device.specific_image):
image_path = fuchsia_device.specific_image
elif tarfile.is_tarfile(fuchsia_device.specific_image):
image_archive_path = fuchsia_device.specific_image
else:
raise ValueError(f'Invalid specific_image "{fuchsia_device.specific_image}"')
if image_path:
reboot_to_bootloader(
fuchsia_device, use_ssh, fuchsia_reconnect_after_reboot_time
)
logging.info(
f'Flashing {fuchsia_device.mdns_name} with {image_path} using authorized keys "{fuchsia_device.authorized_file}".'
)
run_flash_script(fuchsia_device, image_path)
else:
suffix = fuchsia_device.board_type
with tempfile.TemporaryDirectory(suffix=suffix) as image_path:
if file_to_download:
logging.info(f"Downloading {file_to_download} to {image_path}")
job.run(f"gsutil cp {file_to_download} {image_path}")
image_archive_path = os.path.join(
image_path, os.path.basename(file_to_download)
)
if image_archive_path:
# Use tar command instead of tarfile.extractall, as it takes too long.
job.run(
f"tar xfvz {image_archive_path} -C {image_path}", timeout_sec=120
)
reboot_to_bootloader(
fuchsia_device, use_ssh, fuchsia_reconnect_after_reboot_time
)
logging.info(
f'Flashing {fuchsia_device.mdns_name} with {image_archive_path} using authorized keys "{fuchsia_device.authorized_file}".'
)
run_flash_script(fuchsia_device, image_path)
return True
def reboot_to_bootloader(
fuchsia_device, use_ssh=False, fuchsia_reconnect_after_reboot_time=5
):
import psutil # type: ignore
import usbinfo # type: ignore
from antlion.controllers.fuchsia_lib.ssh import SSHError
if use_ssh:
logging.info("Sending reboot command via SSH to " "get into bootloader.")
# Sending this command will put the device in fastboot
# but it does not guarantee the device will be in fastboot
# after this command. There is no check so if there is an
# expectation of the device being in fastboot, then some
# other check needs to be done.
try:
fuchsia_device.ssh.run(
"dm rb", timeout_sec=fuchsia_reconnect_after_reboot_time
)
except SSHError as e:
if "closed by remote host" not in e.result.stderr:
raise e
else:
pass
## Todo: Add elif for SL4F if implemented in SL4F
time_counter = 0
while time_counter < FASTBOOT_TIMEOUT:
logging.info(
"Checking to see if fuchsia_device(%s) SN: %s is in "
"fastboot. (Attempt #%s Timeout: %s)"
% (
fuchsia_device.mdns_name,
fuchsia_device.serial_number,
str(time_counter + 1),
FASTBOOT_TIMEOUT,
)
)
for usb_device in usbinfo.usbinfo():
if (
usb_device["iSerialNumber"] == fuchsia_device.serial_number
and usb_device["iProduct"] == "USB_download_gadget"
):
logging.info(
"fuchsia_device(%s) SN: %s is in fastboot."
% (fuchsia_device.mdns_name, fuchsia_device.serial_number)
)
time_counter = FASTBOOT_TIMEOUT
time_counter = time_counter + 1
if time_counter == FASTBOOT_TIMEOUT:
for fail_usb_device in usbinfo.usbinfo():
logging.debug(fail_usb_device)
raise TimeoutError(
"fuchsia_device(%s) SN: %s "
"never went into fastboot"
% (fuchsia_device.mdns_name, fuchsia_device.serial_number)
)
time.sleep(1)
end_time = time.time() + WAIT_FOR_EXISTING_FLASH_TO_FINISH_SEC
# Attempt to wait for existing flashing process to finish
while time.time() < end_time:
flash_process_found = False
for proc in psutil.process_iter():
if "bash" in proc.name() and "flash.sh" in proc.cmdline():
logging.info("Waiting for existing flash.sh process to complete.")
time.sleep(PROCESS_CHECK_WAIT_TIME_SEC)
flash_process_found = True
if not flash_process_found:
break
def run_flash_script(fuchsia_device, flash_dir):
try:
flash_output = job.run(
f"bash {flash_dir}/flash.sh --ssh-key={fuchsia_device.authorized_file} -s {fuchsia_device.serial_number}",
timeout_sec=FLASH_TIMEOUT_SEC,
)
logging.debug(flash_output.stderr)
except job.TimeoutError as err:
raise TimeoutError(err)
logging.info(
"Waiting %s seconds for device"
" to come back up after flashing." % AFTER_FLASH_BOOT_TIME
)
time.sleep(AFTER_FLASH_BOOT_TIME)
logging.info("Updating device to new IP addresses.")
mdns_ip = None
for retry_counter in range(MDNS_LOOKUP_RETRY_MAX):
mdns_ip = get_fuchsia_mdns_ipv6_address(fuchsia_device.mdns_name)
if mdns_ip:
break
else:
time.sleep(1)
if mdns_ip and utils.is_valid_ipv6_address(mdns_ip):
logging.info(
"IP for fuchsia_device(%s) changed from %s to %s"
% (fuchsia_device.mdns_name, fuchsia_device.ip, mdns_ip)
)
fuchsia_device.ip = mdns_ip
fuchsia_device.address = "http://[{}]:{}".format(
fuchsia_device.ip, fuchsia_device.sl4f_port
)
else:
raise ValueError(f"Invalid IP: {fuchsia_device.mdns_name} after flashing.")