blob: 526df85a37a27e0c509e0aa99cb874db79035870 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
import argparse
import functools
import json
import logging
import os
import platform
import stat
import subprocess
import sys
import tempfile
import time
import typing
from typing import List
try:
global CGPT_BIN
import paths
CGPT_BIN = os.path.join(
paths.PREBUILT_DIR, "tools/cgpt", paths.PREBUILT_PLATFORM, "cgpt"
)
except ImportError as e:
# If we're being run via `fx`, we take paths for cgpt and the Fuchsia images from
# the `import paths` above. Otherwise, we expect the paths to be provided via
# command-line arguments. The command-line arguments also override the autodetected
# paths if they're both present.
pass
if sys.hexversion < 0x030700F0:
logging.critical(
"This script requires Python >= 3.7 to run (you have %s), please upgrade!"
% (platform.python_version())
)
sys.exit(1)
# The GUIDs are from zircon/system/public/zircon/hw/gpt.h
WORKSTATION_INSTALLER_GPT_GUID = "4dce98ce-e77e-45c1-a863-caf92f1330c1"
ZIRCON_R_GPT_GUID = "a0e5cf57-2def-46be-a80c-a2067c37cd49"
VBMETA_R_GPT_GUID = "6a2460c3-cd11-4e8b-80a8-12cce268ed0a"
ABR_META_GPT_GUID = "1d75395d-f2c6-476b-a8b7-45cc1c97b476"
def make_unique_name(name, type):
return f"{name}_{type}"
class ManifestImage:
"""Represents an entry in the 'images.json' manifest that will be installed to disk."""
def __init__(
self, name: str, guids: List[str], type: str, dest_name: str = None
):
"""
Args:
name: 'name' of the partition in the image manifest.
guids: List of GPT GUIDs that this partition should be written to the disk with.
type: 'type' of the partition in the image manifest.
"""
self.name = name
self.guids = guids
self.type = type
if dest_name:
self.dest_name = dest_name
else:
self.dest_name = name
def unique_name(self):
return make_unique_name(self.name, self.type)
# This is the list of Fuchsia build images we write to the final image,
# and the partition types they will have (passed to cgpt)
IMAGES_RECOVERY_INSTALLER = [
# The recovery image for chromebook-x64.
ManifestImage("recovery-installer", ["kernel"], "zbi.signed", "zircon-r"),
# The recovery image and a bootloader for x64.
ManifestImage("recovery-installer", [ZIRCON_R_GPT_GUID], "zbi", "zircon-r"),
ManifestImage(
"recovery-installer", [VBMETA_R_GPT_GUID], "vbmeta", "vbmeta_r"
),
ManifestImage("fuchsia.esp", ["efi"], "blk"),
# Installer uses `WORKSTATION_INSTALLER_GPT_GUID` to tell that it is a partition
# to copy over. They have to come last to prevent gigaboot++ from using them
# for boot(gigaboot++ finds the first match when looking up in GPT).
# Standard x64 partitions
# This is the EFI system partition that will be installed to the target.
ManifestImage("fuchsia.esp", [WORKSTATION_INSTALLER_GPT_GUID], "blk"),
ManifestImage("zircon-a", [WORKSTATION_INSTALLER_GPT_GUID], "zbi"),
ManifestImage(
"zircon-a", [WORKSTATION_INSTALLER_GPT_GUID], "vbmeta", "vbmeta_a"
),
ManifestImage("zircon-r", [WORKSTATION_INSTALLER_GPT_GUID], "zbi"),
ManifestImage(
"zircon-r", [WORKSTATION_INSTALLER_GPT_GUID], "vbmeta", "vbmeta_r"
),
# ChromeOS partitions
# The zircon-r.signed partition is used as both zedboot on the installation
# disk and also the installed zircon-r partition.
ManifestImage(
"zircon-r.signed", [WORKSTATION_INSTALLER_GPT_GUID], "zbi.signed"
),
ManifestImage(
"zircon-a.signed", [WORKSTATION_INSTALLER_GPT_GUID], "zbi.signed"
),
# Common partitions - installed everywhere.
# Only one of these should exist at a time.
ManifestImage("storage-sparse", [WORKSTATION_INSTALLER_GPT_GUID], "blk"),
ManifestImage(
"storage-sparse", [WORKSTATION_INSTALLER_GPT_GUID], "fxfs-blk"
),
]
# This is the list of images for running recovery-eng, which offers userspace
# fastboot.
IMAGES_RECOVERY_FASTBOOT = [
# The recovery-eng image for chromebook-x64.
ManifestImage("recovery-eng", ["kernel"], "zbi.signed", "zircon-r"),
# The recovery-eng image and a bootloader for x64.
ManifestImage("recovery-eng", [ZIRCON_R_GPT_GUID], "zbi", "zircon-r"),
ManifestImage("fuchsia.esp", ["efi"], "blk"),
]
def ParseSize(size):
"""Parse a size.
Args:
size: '<number><suffix>', where suffix is 'K', 'M', or 'G'.
Returns:
A size in bytes equivalent to the human-readable size given.
Raises:
A ValueError if <suffix> is unrecognised or <number> is not a base-10
number.
"""
units = ["K", "M", "G"]
if size.isdigit():
return int(size)
else:
unit = size[-1].upper()
size_bytes = int(size[:-1]) * 1024
if unit not in units:
raise ValueError(
'unrecognised unit suffix "{}" for size {}'.format(unit, size)
)
while units.pop(0) != unit:
size_bytes *= 1024
return size_bytes
def PrettySize(size):
"""Returns a size in bytes as a human-readable string."""
units = "BKMGT"
unit = 0
# By the time we get to 3072, the error caused by
# shifting units is <2%, so we don't care.
while size > 3072 and unit < len(units) - 1:
size /= 1024
unit += 1
return "{:1.1f}{}".format(size, units[unit])
class Partition:
"""Represents a single partition to be written to the disk.
Attributes:
label: label of the partition on the output image.
path: path of the file that is going to be written to the partition on
the host.
real_size: size of the file on the host, in bytes.
size: size of the partition, in bytes. This may not match real_size due
to sector size alignment or EFI partition rules.
type: type of the partition, passed to `cgpt`.
"""
FAT32_MIN_SIZE = 63 * 1024 * 1024
def __init__(
self, path, part_type, label, size: typing.Optional[int] = None
):
self.path = path
self.type = part_type
self.label = label
# Calculate sector-aligned size of this file.
if path:
stat_result = os.stat(path)
if not stat.S_ISREG(stat_result.st_mode):
raise ValueError("{} is not a regular file.".format(path))
rounded_size = stat_result.st_size
size = stat_result.st_size
elif size:
rounded_size = size
else:
raise ValueError("Expected a source file or size")
if rounded_size % Image.SECTOR_SIZE != 0:
rounded_size += Image.SECTOR_SIZE
rounded_size -= rounded_size % Image.SECTOR_SIZE
if self.type == "efi":
# Gigaboot won't be able to load zedboot.bin from an EFI partition that's
# too small, so we ensure the partition is at least big enough for it to
# work.
rounded_size = max(Partition.FAT32_MIN_SIZE, rounded_size)
self.real_size = size
self.size = rounded_size
class Image:
"""Represents a single disk image to be written.
Attributes:
filename: output filename of the image.
is_usb: True if writing to a USB, False if creating an image on the
host.
file: filehandle to the output image. Held open while we work to prevent
auto-mounting of USB.
block_size: number of bytes to write at a time to the disk.
file_size: total size of the image, in bytes
partitions: list of |Partition| objects to write to disk.
copy_image: pre-existing USB image to copy. Only one of |partitions| or
|copy_image| can be set.
"""
SECTOR_SIZE = 512
GPT_SECTORS = 2048
CROS_RESERVED_SECTORS = 2048
def __init__(self, filename, is_usb, block_size):
self.filename = filename
self.is_usb = is_usb
self.file = open(filename, mode="wb")
self.block_size = block_size
# Allocate space for the primary and backup GPTs
self.file_size = 2 * Image.GPT_SECTORS * Image.SECTOR_SIZE
self.partitions = []
self.copy_image = None
def _Cgpt(self, args):
args = [CGPT_BIN] + args
return subprocess.run(args, capture_output=True)
def _CgptAdd(self, part, offset):
"""Add a partition to the GPT represented by this |Image|.
Args:
part: partition to add
offset: offset to add partition at. Must be a multiple of SECTOR_SIZE.
Returns:
True if add succeded, False if it failed.
"""
if offset % Image.SECTOR_SIZE != 0:
raise ValueError("Offset must be a multiple of SECTOR_SIZE!")
if part.size % Image.SECTOR_SIZE != 0:
raise ValueError("Size must be a multiple of SECTOR_SIZE!")
size = part.size // Image.SECTOR_SIZE
offset //= Image.SECTOR_SIZE
cgpt_args = [
"add",
"-s",
str(size),
"-t",
part.type,
"-b",
str(offset),
"-l",
part.label,
self.filename,
]
if part.type == "kernel":
# Mark CrOS kernel partition as bootable.
# We assume that the disk will only have 1 kernel partition.
# -T 1 = Bootloader will try to boot this partition once.
# -S 0 = This partition has been successfully booted.
# -P 2 = Partition priority is 2. Partition with the highest priority gets
# booted.
cgpt_args += ["-T", "1", "-S", "1", "-P", "2"]
ret = self._Cgpt(cgpt_args)
if ret.returncode != 0:
logging.critical(
"\n"
"======= CGPT ADD FAILED! =======\n"
"Maybe your disk is too small?\n"
)
logging.error(ret.stdout)
logging.error(ret.stderr)
return False
return True
def AddPartition(self, partition):
"""Add a partition to the outputted disk image.
This function does not write any data - call Finalise() to write the
disk image once all partitions have been added.
Args:
partition: partition to add
"""
self.partitions.append(partition)
self.file_size += partition.size
def CopyImage(self, image):
"""Copies a complete image to the output file.
Args:
image: path to the image to copy.
"""
self.copy_image = image
self.file_size = os.stat(image).st_size
self.Finalise()
def _CopyToFile(self, path, offset):
"""Copies bytes to the output file.
|self.file| must have already been resized large enough to hold
the contents at |path|.
Args:
path: path to the source file.
offset: offset within |self.file| to copy to.
"""
self.file.seek(offset, 0)
written = 0
with open(path, "rb") as fh:
start = time.perf_counter()
for block in iter(functools.partial(fh.read, self.block_size), b""):
written += len(block)
self.file.write(block)
# flush and fsync to get accurate timing results
self.file.flush()
os.fsync(self.file.fileno())
finish = time.perf_counter()
per_second = written / (finish - start)
logging.info(
" Wrote {} in {:1.2f}s, {}/s".format(
PrettySize(written), finish - start, PrettySize(per_second)
)
)
def Finalise(self):
"""Write all the partitions this image represents to disk/file."""
# Make sure we didn't try to add partitions and a full image to copy.
if self.partitions and self.copy_image:
raise ValueError("Cannot copy partitions and a full image")
if not self.is_usb:
# first, make sure the file is big enough.
logging.info("Create image of size={} bytes".format(self.file_size))
self.file.truncate(self.file_size)
self.file.flush()
os.fsync(self.file.fileno())
if self.partitions:
logging.info("Creating new GPT partition table...")
self._Cgpt(["create", self.filename])
self._Cgpt(["boot", "-p", self.filename])
logging.info("Done.")
logging.info("Creating and writing partitions...")
current_offset = Image.SECTOR_SIZE * Image.GPT_SECTORS
for part in self.partitions:
if not self._CgptAdd(part, current_offset):
logging.warning("Write failed, aborting.")
self.file.close()
return
if part.path:
logging.info(
" Writing image {} to partition {}... ".format(
part.path.split("/")[-1], part.label
)
)
self._CopyToFile(part.path, current_offset)
current_offset += part.size
elif self.copy_image:
logging.info("Copying image from %s", self.copy_image)
self._CopyToFile(self.copy_image, 0)
else:
raise ValueError("No contents found")
logging.info("Done.")
self.file.close()
def GetPartitions(build_dir, images_file, target_images):
"""Get all partitions to be written to the output image.
The list of partitions is currently determined by the IMAGES dict
at the top of this file.
Args:
build_dir: path to the build directory containing images.
images_file: path to images.json. If None, will generate from build_dir
Returns:
a list of |Partition| objects to be written to the disk.
"""
use_signed_images = False
images = {}
if images_file == "":
images_file = os.path.join(build_dir, "images.json")
try:
with open(images_file) as f:
images_list = json.load(f)
for image in images_list:
images[make_unique_name(image["name"], image["type"])] = image
if image["type"] == "zbi.signed":
use_signed_images = True
except IOError as err:
logging.critical(
"Failed to find image manifest. Have you run `fx build`?",
exc_info=err,
)
return []
ret = []
is_bootable = False
for image in target_images:
if image.unique_name() not in images:
logging.debug(
"Skipping image that wasn't built: {}".format(
image.unique_name()
)
)
continue
if use_signed_images and image.type == "zbi":
logging.debug(
"Skipping unsigned image: {}".format(image.unique_name())
)
continue
for part_type in image.guids:
full_path = os.path.join(
build_dir, images[image.unique_name()]["path"]
)
ret.append(Partition(full_path, part_type, image.dest_name))
# Assume that any non-installer partition is a bootable partition.
if part_type != WORKSTATION_INSTALLER_GPT_GUID:
is_bootable = True
if not is_bootable:
logging.critical(
"ERROR: mkinstaller would generate an unbootable image."
+ "Are you building for a supported platform?"
)
return []
return ret
def GetUsbDisks():
"""Get a list of all USB disks on the system.
Returns:
A list where each entry is of the format '/path/to/disk - <disk name>'
"""
res = subprocess.run(["fx", "list-usb-disks"], capture_output=True)
res.check_returncode()
disks = [d for d in res.stdout.decode("utf-8").split("\n") if d]
return disks
def IsUsbDisk(path):
"""Is the given path a USB disk?
Args:
path: a path that may represent a USB disk. Does not have to exist.
Returns:
True if the path represents a USB disk, False otherwise.
"""
return path in map(lambda a: a.split()[0], GetUsbDisks())
def UnmountDisk(path):
"""Unmount the given USB disk from the system."""
system = platform.system()
if system == "Darwin":
subprocess.run(["diskutil", "quiet", "unmountDisk", path])
def EjectDisk(path):
"""Eject the given USB disk from the system."""
system = platform.system()
if system == "Linux":
subprocess.run(["eject", path])
elif system == "Darwin":
subprocess.run(["diskutil", "eject", path])
logging.info("Ejected USB disk")
def Main(args):
if args.temp_remote_image_only:
# We have to let the temporary directory persist past program exit
# since the local host will need to copy it over; it's the caller's
# responsibility to clean it up when finished.
temp_dir = tempfile.mkdtemp()
path = os.path.join(temp_dir, "installer.img")
# Print the file name to stdout so the caller knows where to find it.
print(path)
# --temp-remote-image-only always implies --create.
args.create = True
else:
path = args.FILE
if args.create:
if not args.force and os.path.exists(path):
logging.critical(
"File {} already exists, not creating an image. Use --force if you want to proceed.".format(
path
)
)
return 1
else:
if not os.path.exists(path):
logging.critical(
(
"Path {} does not exist, use --create to create a disk image.\n"
"Detected USB devices:\n"
"{}"
).format(path, "\n".join(GetUsbDisks()))
)
return 1
if not IsUsbDisk(path):
logging.critical(
(
"Path {} is not a USB device. Use -f to force.\n"
"Detected USB devices:\n"
"{}"
).format(path, "\n".join(GetUsbDisks()))
)
return 1
if not os.access(path, os.W_OK) and sys.stdin.isatty():
logging.warning(
"Changing ownership of {} to {}".format(
path, os.environ.get("USER")
)
)
subprocess.run(
["sudo", "chown", os.environ.get("USER"), path],
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
)
elif not os.access(path, os.W_OK):
logging.critical(
"Cannot write to {}. Please check file permissions!".format(
path
)
)
return 1
UnmountDisk(path)
output = Image(path, not args.create, ParseSize(args.block_size))
if args.from_image:
# Just copy the existing image file.
output.CopyImage(args.from_image)
else:
# Set up the ChromeOS reserved partition.
reserved_part = Partition(
None,
"reserved",
"reserved",
Image.CROS_RESERVED_SECTORS * Image.SECTOR_SIZE,
)
output.AddPartition(reserved_part)
build_dir = args.build_dir
if build_dir == "":
build_dir = paths.FUCHSIA_BUILD_DIR
if args.recovery_fastboot:
target_images = IMAGES_RECOVERY_FASTBOOT
else:
target_images = IMAGES_RECOVERY_INSTALLER
parts = GetPartitions(build_dir, args.images, target_images)
if not parts:
return 1
# Add a abr metadata partition
with tempfile.TemporaryDirectory() as temp_dir:
# A pre-generated abr metadata that can only boots zircon-r
abr_data_file = os.path.join(temp_dir, "abr_data")
with open(abr_data_file, "wb") as abr_data:
abr_data.write(
(
b"\x00\x41\x42\x30\x02\x01\x00\x00\x0f\x00\x00\x00\x0e\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x6b\xa3\x22\x12"
).ljust(
1024 * 1024
) # Make a 1MB partition.
)
# TODO(b/268532862): Use new GUID once switched to gigaboot++.
parts.append(
Partition(str(abr_data_file), ABR_META_GPT_GUID, "durable_boot")
)
for p in parts:
output.AddPartition(p)
output.Finalise()
if not args.create:
EjectDisk(path)
return 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Create a Fuchsia installer image.",
prog="fx mkinstaller[-remote]",
)
parser.add_argument(
"-c",
"--create",
action=argparse.BooleanOptionalAction,
default=False,
help="Create a disk image instead of writing to an existing disk.",
)
parser.add_argument(
"-f",
"--force",
action=argparse.BooleanOptionalAction,
default=False,
help="Force writing to an image that already exists or a disk that might not be a USB.",
)
parser.add_argument(
"-b",
"--block-size",
type=str,
default="2M",
help="Block size (optionally suffixed by K, M, G) to write. Default is 2M",
)
parser.add_argument(
"-v",
"--verbose",
action=argparse.BooleanOptionalAction,
default=False,
help="Be verbose while creating disk images.",
)
parser.add_argument(
"--cgpt-path",
type=str,
default="",
help="Path to cgpt in the Fuchsia tree. The script will try and guess if no path is provided.",
)
parser.add_argument(
"--images",
type=str,
default="",
help="Path to images.json in the Fuchsia tree. Default to build_dir/images.json.",
)
parser.add_argument(
"--build-dir",
type=str,
default="",
help="Path to the Fuchsia build directory. The script will try and guess if no path is provided.",
)
parser.add_argument(
"--from-image",
help="Only write an existing installer image at this path to the out path.",
)
parser.add_argument(
"--new-installer",
action=argparse.BooleanOptionalAction,
default=False,
help="DEPRECATED. Has no effect.",
)
parser.add_argument(
"--recovery-fastboot",
action=argparse.BooleanOptionalAction,
default=False,
help="Create a bootable recovery-eng image with userspace fastboot.",
)
parser.add_argument(
"host",
nargs="?",
help="Build host name; only used for `mkinstaller-remote`.",
)
parser.add_argument(
"host_dir",
nargs="?",
help="Build host root Fuchsia dir; only used for `mkinstaller-remote`.",
)
parser.add_argument("FILE", help="Path to USB device or installer image")
# Internal-only helper args for mkinstaller-remote.
# --print-host-args-only just prints the host/host_dir args if they exist
# and exits, so we don't have to re-implement all the commandline parsing
# in the mkinstaller-remote shell script.
parser.add_argument(
"--print-host-args-only",
action=argparse.BooleanOptionalAction,
default=False,
help=argparse.SUPPRESS,
)
# --temp-remote-image-only is similar to --create, but it ignores the |FILE|
# argument and instead creates a new image in /tmp then prints the resulting
# path. Used by mkinstaller-remote to override the local |FILE| argument on
# the remote host.
parser.add_argument(
"--temp-remote-image-only",
action=argparse.BooleanOptionalAction,
default=False,
help=argparse.SUPPRESS,
)
argv = parser.parse_args()
level = logging.WARNING
if argv.verbose:
level = logging.DEBUG
if argv.cgpt_path:
CGPT_BIN = argv.cgpt_path
logging.basicConfig(
format="mkinstaller: %(levelname)s: %(message)s", level=level
)
if argv.print_host_args_only:
print(" ".join([arg for arg in (argv.host, argv.host_dir) if arg]))
sys.exit(0)
sys.exit(Main(argv))