| #!/usr/bin/env python3 |
| # Copyright 2020 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import print_function |
| |
| import argparse |
| import functools |
| import json |
| import logging |
| import os |
| import platform |
| import stat |
| import subprocess |
| import sys |
| import time |
| import typing |
| |
| from typing import List |
| |
| try: |
| global CGPT_BIN |
| import paths |
| CGPT_BIN = os.path.join(paths.PREBUILT_DIR, 'tools/cgpt', |
| paths.PREBUILT_PLATFORM, 'cgpt') |
| except ImportError as e: |
| # If we're being run via `fx`, we take paths for cgpt and the Fuchsia images from |
| # the `import paths` above. Otherwise, we expect the paths to be provided via |
| # command-line arguments. The command-line arguments also override the autodetected |
| # paths if they're both present. |
| pass |
| |
| if sys.hexversion < 0x030700F0: |
| logging.critical( |
| 'This script requires Python >= 3.7 to run (you have %s), please upgrade!' |
| % (platform.python_version()), |
| file=sys.stderr) |
| sys.exit(1) |
| |
| WORKSTATION_INSTALLER_GPT_GUID = '4dce98ce-e77e-45c1-a863-caf92f1330c1' |
| |
| def make_unique_name(name, type): |
| return f'{name}_{type}' |
| |
| class ManifestImage: |
| """ Represents an entry in the 'images.json' manifest that will be installed to disk. """ |
| def __init__(self, name: str, guids: List[str], type: str): |
| """ |
| Args: |
| name: 'name' of the partition in the image manifest. |
| guids: List of GPT GUIDs that this partition should be written to the disk with. |
| type: 'type' of the partition in the image manifest. |
| """ |
| self.name = name |
| self.guids = guids |
| self.type = type |
| |
| def unique_name(self): |
| return make_unique_name(self.name, self.type) |
| |
| |
| # This is the list of Fuchsia build images we write to the final image, |
| # and the partition types they will have (passed to cgpt) |
| IMAGES = [ |
| # Standard x64 partitions |
| # This is the zedboot image, which is actually booted. |
| ManifestImage('zedboot-efi', ['efi'], 'blk'), |
| # This is the EFI system partition that will be installed to the target. |
| ManifestImage('efi', [WORKSTATION_INSTALLER_GPT_GUID], 'blk'), |
| ManifestImage('zircon-a', [WORKSTATION_INSTALLER_GPT_GUID], 'zbi'), |
| ManifestImage('zircon-r', [WORKSTATION_INSTALLER_GPT_GUID], 'zbi'), |
| |
| # ChromeOS partitions |
| # The zircon-r.signed partition is used as both zedboot on the installation |
| # disk and also the installed zircon-r partition. |
| ManifestImage('zircon-r.signed', ['kernel', WORKSTATION_INSTALLER_GPT_GUID], 'zbi.signed'), |
| ManifestImage('zircon-a.signed', [WORKSTATION_INSTALLER_GPT_GUID], 'zbi.signed'), |
| |
| # Common partitions - installed everywhere. |
| ManifestImage('storage-sparse', [WORKSTATION_INSTALLER_GPT_GUID], 'blk'), |
| ] |
| |
| |
| def ParseSize(size): |
| """Parse a size. |
| |
| Args: |
| size: '<number><suffix>', where suffix is 'K', 'M', or 'G'. |
| |
| Returns: |
| A size in bytes equivalent to the human-readable size given. |
| Raises: |
| A ValueError if <suffix> is unrecognised or <number> is not a base-10 |
| number. |
| """ |
| units = ['K', 'M', 'G'] |
| if size.isdigit(): |
| return int(size) |
| else: |
| unit = size[-1].upper() |
| size_bytes = int(size[:-1]) * 1024 |
| |
| if unit not in units: |
| raise ValueError('unrecognised unit suffix "{}" for size {}'.format( |
| unit, size)) |
| |
| while units.pop(0) != unit: |
| size_bytes *= 1024 |
| return size_bytes |
| |
| |
| def PrettySize(size): |
| """Returns a size in bytes as a human-readable string.""" |
| units = 'BKMGT' |
| |
| unit = 0 |
| # By the time we get to 3072, the error caused by |
| # shifting units is <2%, so we don't care. |
| while size > 3072 and unit < len(units) - 1: |
| size /= 1024 |
| unit += 1 |
| |
| return '{:1.1f}{}'.format(size, units[unit]) |
| |
| |
| class Partition: |
| """Represents a single partition to be written to the disk. |
| |
| Attributes: |
| label: label of the partition on the output image. |
| path: path of the file that is going to be written to the partition on |
| the host. |
| real_size: size of the file on the host, in bytes. |
| size: size of the partition, in bytes. This may not match real_size due |
| to sector size alignment or EFI partition rules. |
| type: type of the partition, passed to `cgpt`. |
| """ |
| FAT32_MIN_SIZE = (63 * 1024 * 1024) |
| |
| def __init__(self, path, part_type, label, size: typing.Optional[int] = None): |
| self.path = path |
| self.type = part_type |
| self.label = label |
| |
| # Calculate sector-aligned size of this file. |
| if path: |
| stat_result = os.stat(path) |
| if not stat.S_ISREG(stat_result.st_mode): |
| raise ValueError('{} is not a regular file.'.format(path)) |
| rounded_size = stat_result.st_size |
| size = stat_result.st_size |
| elif size: |
| rounded_size = size |
| else: |
| raise ValueError('Expected a source file or size') |
| if rounded_size % Image.SECTOR_SIZE != 0: |
| rounded_size += Image.SECTOR_SIZE |
| rounded_size -= rounded_size % Image.SECTOR_SIZE |
| if self.type == 'efi': |
| # Gigaboot won't be able to load zedboot.bin from an EFI partition that's |
| # too small, so we ensure the partition is at least big enough for it to |
| # work. |
| rounded_size = max(Partition.FAT32_MIN_SIZE, rounded_size) |
| self.real_size = size |
| self.size = rounded_size |
| |
| |
| class Image: |
| """Represents a single disk image to be written. |
| |
| Attributes: |
| filename: output filename of the image. |
| is_usb: True if writing to a USB, False if creating an image on the |
| host. |
| file: filehandle to the output image. Held open while we work to prevent |
| auto-mounting of USB. |
| block_size: number of bytes to write at a time to the disk. |
| file_size: total size of the image, in bytes |
| partitions: list of |Partition| objects to write to disk. |
| """ |
| SECTOR_SIZE = 512 |
| GPT_SECTORS = 2048 |
| CROS_RESERVED_SECTORS = 2048 |
| |
| def __init__(self, filename, is_usb, block_size): |
| self.filename = filename |
| self.is_usb = is_usb |
| self.file = open(filename, mode='wb') |
| self.block_size = block_size |
| |
| # Allocate space for the primary and backup GPTs |
| self.file_size = 2 * Image.GPT_SECTORS * Image.SECTOR_SIZE |
| self.partitions = [] |
| |
| # Set up the ChromeOS reserved partition. |
| reserved_part = Partition(None, 'reserved', 'reserved', |
| self.CROS_RESERVED_SECTORS * self.SECTOR_SIZE) |
| self.AddPartition(reserved_part) |
| |
| def _Cgpt(self, args): |
| args = [CGPT_BIN] + args |
| return subprocess.run(args, capture_output=True) |
| |
| def _CgptAdd(self, part, offset): |
| """Add a partition to the GPT represnted by thsis |Image|. |
| |
| Args: |
| part: partition to add |
| offset: offset to add partition at. Must be a multiple of SECTOR_SIZE. |
| Returns: |
| True if add succeded, False if it failed. |
| """ |
| if offset % Image.SECTOR_SIZE != 0: |
| raise ValueError('Offset must be a multiple of SECTOR_SIZE!') |
| if part.size % Image.SECTOR_SIZE != 0: |
| raise ValueError('Size must be a multiple of SECTOR_SIZE!') |
| size = part.size // Image.SECTOR_SIZE |
| offset //= Image.SECTOR_SIZE |
| cgpt_args = ['add', '-s', str(size), '-t', part.type, '-b', str(offset), |
| '-l', part.label, self.filename] |
| |
| if part.type == 'kernel': |
| # Mark CrOS kernel partition as bootable. |
| # We assume that the disk will only have 1 kernel partition. |
| # -T 1 = Bootloader will try to boot this partition once. |
| # -S 0 = This partition has been successfully booted. |
| # -P 2 = Partition priority is 2. Partition with the highest priority gets |
| # booted. |
| cgpt_args += ['-T', '1', '-S', '1', '-P', '2'] |
| |
| ret = self._Cgpt(cgpt_args) |
| if ret.returncode != 0: |
| logging.critical('\n' |
| '======= CGPT ADD FAILED! =======\n' |
| 'Maybe your disk is too small?\n') |
| logging.error(ret.stdout) |
| logging.error(ret.stderr) |
| return False |
| return True |
| |
| def AddPartition(self, partition): |
| """Add a partition to the outputted disk image. |
| |
| This function does not write any data - call Finalise() to write the |
| disk image once all partitions have been added. |
| |
| Args: |
| partition: partition to add |
| """ |
| self.partitions.append(partition) |
| self.file_size += partition.size |
| |
| def WritePart(self, part, offset): |
| """Writes data to a partition on the output device. |
| |
| Args: |
| part: partiton to write |
| offset: offset in bytes to write to |
| """ |
| if part.path is None: |
| return |
| self.file.seek(offset, 0) |
| |
| written = 0 |
| logging.info( |
| ' Writing image {} to partition {}... '.format( |
| part.path.split('/')[-1], part.label) |
| ) |
| with open(part.path, 'rb') as fh: |
| start = time.perf_counter() |
| for block in iter(functools.partial(fh.read, self.block_size), b''): |
| written += len(block) |
| self.file.write(block) |
| # flush and fsync to get accurate timing results |
| self.file.flush() |
| os.fsync(self.file.fileno()) |
| finish = time.perf_counter() |
| per_second = written / (finish - start) |
| logging.info(' Wrote {} in {:1.2f}s, {}/s'.format( |
| PrettySize(written), finish - start, PrettySize(per_second))) |
| |
| def Finalise(self): |
| """Write all the partitions this image represents to disk/file.""" |
| if not self.is_usb: |
| # first, make sure the file is big enough. |
| logging.info('Create image of size={} bytes'.format(self.file_size)) |
| self.file.truncate(self.file_size) |
| self.file.flush() |
| os.fsync(self.file.fileno()) |
| |
| logging.info('Creating new GPT partition table...') |
| self._Cgpt(['create', self.filename]) |
| self._Cgpt(['boot', '-p', self.filename]) |
| logging.info('Done.') |
| |
| logging.info('Creating and writing partitions...') |
| current_offset = Image.SECTOR_SIZE * Image.GPT_SECTORS |
| for part in self.partitions: |
| if not self._CgptAdd(part, current_offset): |
| logging.warning('Write failed, aborting.') |
| self.file.close() |
| return |
| self.WritePart(part, current_offset) |
| current_offset += part.size |
| logging.info('Done.') |
| self.file.close() |
| |
| def GetPartitions(build_dir): |
| """Get all partitions to be written to the output image. |
| |
| The list of partitions is currently determined by the IMAGES dict |
| at the top of this file. |
| |
| Args: |
| build_dir: path to the build directory containing images. |
| |
| Returns: |
| a list of |Partition| objects to be written to the disk. |
| """ |
| images = {} |
| images_file = os.path.join(build_dir, 'images.json') |
| try: |
| with open(images_file) as f: |
| images_list = json.load(f) |
| for image in images_list: |
| images[make_unique_name(image['name'], image['type'])] = image |
| except IOError as err: |
| logging.critical('Failed to find image manifest. Have you run `fx build`?', exc_info=err) |
| return [] |
| |
| ret = [] |
| is_bootable = False |
| for image in IMAGES: |
| if image.unique_name() not in images: |
| logging.debug("Skipping image that wasn't built: {}".format(image.unique_name())) |
| continue |
| |
| for part_type in image.guids: |
| full_path = os.path.join(build_dir, images[image.unique_name()]['path']) |
| ret.append(Partition(full_path, part_type, image.name)) |
| # Assume that any non-installer partition is a bootable partition. |
| if part_type != WORKSTATION_INSTALLER_GPT_GUID: |
| is_bootable = True |
| |
| if not is_bootable: |
| logging.critical('ERROR: mkinstaller would generate an unbootable image.' + |
| 'Are you building for a supported platform?') |
| return [] |
| |
| return ret |
| |
| |
| def GetUsbDisks(): |
| """Get a list of all USB disks on the system. |
| |
| Returns: |
| A list where each entry is of the format '/path/to/disk - <disk name>' |
| """ |
| res = subprocess.run(['fx', 'list-usb-disks'], capture_output=True) |
| res.check_returncode() |
| |
| disks = res.stdout.decode('utf-8').split('\n') |
| return disks |
| |
| |
| def IsUsbDisk(path): |
| """Is the given path a USB disk? |
| |
| Args: |
| path: a path that may represent a USB disk. Does not have to exist. |
| |
| Returns: |
| True if the path represents a USB disk, False otherwise. |
| """ |
| return path in map(lambda a: a.split()[0], GetUsbDisks()) |
| |
| |
| def EjectDisk(path): |
| """Eject the given USB disk from the system.""" |
| system = platform.system() |
| if system == 'Linux': |
| subprocess.run(['eject', path]) |
| elif system == 'Darwin': |
| subprocess.run(['diskutil', 'eject', path]) |
| logging.info('Ejected USB disk') |
| |
| |
| def Main(args): |
| path = args.FILE |
| if args.create: |
| if not args.force and os.path.exists(path): |
| logging.critical( |
| 'File {} already exists, not creating an image. Use --force if you want to proceed.' |
| .format(path), |
| file=sys.stderr) |
| return 1 |
| else: |
| if not os.path.exists(path): |
| logging.critical( |
| ('Path {} does not exist, use --create to create a disk image.\n' |
| 'Detected USB devices:\n' |
| '{}').format(path, '\n'.join(GetUsbDisks())), |
| end='', |
| file=sys.stderr) |
| return 1 |
| if not IsUsbDisk(path): |
| logging.critical( |
| ('Path {} is not a USB device. Use -f to force.\n' |
| 'Detected USB devices:\n' |
| '{}').format(path, '\n'.join(GetUsbDisks())), |
| end='', |
| file=sys.stderr) |
| return 1 |
| |
| if not os.access(path, os.W_OK) and sys.stdin.isatty(): |
| logging.warning('Changing ownership of {} to {}'.format(path, |
| os.environ.get('USER'))) |
| subprocess.run( |
| ['sudo', 'chown', os.environ.get('USER'), path], |
| stdin=sys.stdin, |
| stdout=sys.stdout, |
| stderr=sys.stderr) |
| elif not os.access(path, os.W_OK): |
| logging.critical('Cannot write to {}. Please check file permissions!'.format(path)) |
| return 1 |
| |
| build_dir = args.build_dir |
| if build_dir == '': |
| build_dir = paths.FUCHSIA_BUILD_DIR |
| parts = GetPartitions(build_dir) |
| if not parts: |
| return 1 |
| |
| output = Image(args.FILE, not args.create, ParseSize(args.block_size)) |
| for p in parts: |
| output.AddPartition(p) |
| |
| output.Finalise() |
| if not args.create: |
| EjectDisk(path) |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser( |
| description='Create a Fuchsia installer image.', prog='fx mkinstaller') |
| parser.add_argument( |
| '-c', |
| '--create', |
| action='store_true', |
| help='Create a disk image instead of writing to an existing disk.') |
| parser.add_argument( |
| '-f', |
| '--force', |
| action='store_true', |
| help='Force writing to an image that already exists or a disk that might not be a USB.' |
| ) |
| parser.add_argument( |
| '-b', |
| '--block-size', |
| type=str, |
| default='2M', |
| help='Block size (optionally suffixed by K, M, G) to write. Default is 2M' |
| ) |
| parser.add_argument( |
| '-v', |
| '--verbose', |
| action='store_true', |
| help='Be verbose while creating disk images.' |
| ) |
| parser.add_argument( |
| '--cgpt-path', |
| type=str, |
| default='', |
| help='Path to cgpt in the Fuchsia tree. The script will try and guess if no path is provided.' |
| ) |
| parser.add_argument( |
| '--build-dir', |
| type=str, |
| default='', |
| help='Path to the Fuchsia build directory. The script will try and guess if no path is provided.' |
| ) |
| parser.add_argument('FILE', help='Path to USB device or installer image') |
| argv = parser.parse_args() |
| level = logging.WARNING |
| if argv.verbose: |
| level = logging.DEBUG |
| if argv.cgpt_path: |
| CGPT_BIN = argv.cgpt_path |
| logging.basicConfig(format='mkinstaller: %(levelname)s: %(message)s', level=level) |
| sys.exit(Main(argv)) |