blob: cf0389a50ff14242db93b92a3eb9e63d872bed7e [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
#
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Script to create Fuchsia GPT images.
This script wraps the `sgdisk` utility to provide a simpler interface tailored
specifically for Fuchsia GPT images.
When creating a new GPT, the layout is specified by a JSON file that looks like:
{
"size_mib": disk size in MiB
"block_size": disk R/W block size in bytes
"alignment_kib": partition start alignment in KiB; should generally be a
multiple of the disk erase block size
"partitions": [
{
"name": partition name
"size_kib": size in KiB, or "fill" to use the rest of the disk
"unique_guid": unique GUID; omit to assign randomly
"type_guid": type GUID, or one of ("fvm", "vbmeta", "zircon") to use
the standard Fuchsia types
}
]
}
Each partition starts immediately following the previous; this tool does not
support creating GPTs with out-of-order partitions or gaps between partitions.
"""
import argparse
import contextlib
import dataclasses
import json
import logging
import os
import pathlib
import re
import subprocess
import sys
import tempfile
import textwrap
from typing import Dict, Iterator, List, Optional
# A regexp to capture GUIDs as reported by `sgdisk`.
# Example: 4B7A23A5-E9D1-456A-BACB-78B23D855324.
GUID_RE = r"[0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12}"
# Maps partition type names to known type GUIDs.
# https://cs.opensource.google/fuchsia/fuchsia/+/main:zircon/system/public/zircon/hw/gpt.h.
_TYPE_GUID_FROM_NAME = {
"fvm": "49FD7CB8-DF15-4E73-B9D9-992070127F0F",
"vbmeta": "421A8BFC-85D9-4D85-ACDA-B64EEC0133E9",
"zircon": "9B37FFF6-2E58-466A-983A-F7926D0B04E0",
}
@dataclasses.dataclass
class Partition:
"""GPT partition info."""
# 1-based index in the GPT entry array.
index: int
name: str
# Partition first block, last block (inclusive), and size in blocks.
# Logical block addressing (LBA) is used here because it's what GPT uses
# internally and is indivisible so will always be integers for exact
# calculations (unlike KiB/MiB).
first_lba: int
last_lba: int
size_lba: int
type_guid: str
unique_guid: str
attr_flags: str
@contextlib.contextmanager
def gen_disk_image(source_gpt: str, image_size_mib: int) -> Iterator[str]:
"""Generates empty disk image using the given source GPT."""
print("Generating disk image")
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir, "disk_image")
with open(image_path, "wb") as file:
file.truncate(image_size_mib * 1024 * 1024)
subprocess.run(
["sgdisk", f"--load-backup={source_gpt}", image_path],
capture_output=True,
check=True,
)
yield image_path
print("Deleting disk image")
# This function is a bit brittle as it depends heavily on `sgdisk` output format
# which is not guaranteed to be stable. If it becomes unreliable we may either
# need to bring a fixed version of sgdisk into //prebuilts, or just implement
# GPT handling ourselves.
def get_partitions(disk_file: str) -> List[Partition]:
"""Returns a list of all Partitions."""
# Find all the partition indices first.
# Output looks like:
# -----------------------------------------------------------------
# Disk test_disk.img: 131072 sectors, 64.0 MiB
# Sector size (logical): 512 bytes
# Disk identifier (GUID): F1D74D6E-1DB4-4E46-8D2A-87AC93543ACA
# Partition table holds up to 128 entries
# Main partition table begins at sector 2 and ends at sector 33
# First usable sector is 34, last usable sector is 131038
# Partitions will be aligned on 2048-sector boundaries
# Total free space is 126909 sectors (62.0 MiB)
#
# Number Start (sector) End (sector) Size Code Name
# 1 2048 4095 1024.0 KiB 8300
# 2 4096 6143 1024.0 KiB 8300
# -----------------------------------------------------------------
# Here we just care about the index, which will be the first entry on lines
# that start with a number.
output = subprocess.run(
["sgdisk", disk_file, "--print"],
text=True,
capture_output=True,
check=True,
).stdout
indices = re.findall(r"^\s*(\d+)\s+", output, re.MULTILINE)
partitions = []
for index in indices:
# Now fill in the full partition information. Output looks like:
# -----------------------------------------------------------------
# Partition GUID code: 0FC63DAF-8483-4772-8E79-3D69D8477DE4 (Linux filesystem)
# Partition unique GUID: 4B7A23A5-E9D1-456A-BACB-78B23D855324
# First sector: 2048 (at 1024.0 KiB)
# Last sector: 4095 (at 2.0 MiB)
# Partition size: 2048 sectors (1024.0 KiB)
# Attribute flags: 0000000000000000
# Partition name: 'foo'
# -----------------------------------------------------------------
# We could potentially query all partitions in a single command, but
# the extra delay is negligible and this is simpler.
output = subprocess.run(
["sgdisk", disk_file, "--info", index],
text=True,
capture_output=True,
check=True,
).stdout
def get_param(pattern):
return re.search(f"^{pattern}.*?$", output, re.MULTILINE).group(1)
partitions.append(
Partition(
index=int(index),
name=get_param(r"Partition name: '(.*?)'"),
first_lba=int(get_param(r"First sector: (\d+)")),
last_lba=int(get_param(r"Last sector: (\d+)")),
size_lba=int(get_param(r"Partition size: (\d+) sectors")),
type_guid=get_param(f"Partition GUID code: ({GUID_RE})"),
unique_guid=get_param(f"Partition unique GUID: ({GUID_RE})"),
attr_flags=get_param(r"Attribute flags: (\S*)"),
)
)
return partitions
def get_partition(
disk_file: str,
*,
part_name: Optional[str] = None,
index: Optional[int] = None,
) -> Partition:
"""Returns a single Partition keyed by name and/or index.
Raises ValueError if the requested partition couldn't be found.
"""
if part_name is None and index is None:
raise ValueError("Must specify either a name or index")
def filter(part):
return (part_name is None or part.name == part_name) and (
index is None or part.index == index
)
partitions = get_partitions(disk_file)
try:
return next(p for p in partitions if filter(p))
except StopIteration:
raise ValueError(f"Failed to find partition {part_name}")
def get_part_num(disk_file: str, part_name: str) -> int:
"""Returns the numeric index of a given partition.
Raises ValueError if the given partition name doesn't exist.
"""
return get_partition(disk_file, part_name=part_name).index
def mib_to_lba(mib: int, blk_size: int) -> int:
"""Convert size in MB to lba"""
return int(mib * 1024 * 1024 / blk_size)
def save_gpt(
disk_path: pathlib.Path,
block_size: int,
*,
mbr_path: Optional[pathlib.Path] = None,
primary_path: Optional[pathlib.Path] = None,
backup_path: Optional[pathlib.Path] = None,
):
"""Extracts and saves the GPT sections from a given disk image.
Args:
disk_path: path to the disk image.
block_size: block size in bytes.
mbr_path: the path to write the MBR binary, or None.
primary_path: the path to write the primary GPT binary, or None.
backup_path: the path to write the backup GPT binary, or None.
"""
# The MBR is always the first block.
mbr_size = block_size
# GPT header is always one block, followed by 16KiB of space for partition
# entries. Technically it can be more than 16KiB but none of our layouts
# need this so we don't worry about it.
gpt_size = block_size + (16 * 1024)
with open(disk_path, "rb") as disk_file:
mbr = disk_file.read(mbr_size)
primary = disk_file.read(gpt_size)
disk_file.seek(-gpt_size, 2)
backup = disk_file.read()
for path, contents in (
(mbr_path, mbr),
(primary_path, primary),
(backup_path, backup),
):
if path:
path.write_bytes(contents)
def delete_part(disk_file: str, part_num: int):
"""Delete a partition from the GPT"""
subprocess.run(
["sgdisk", "--delete", f"{part_num}", disk_file],
capture_output=True,
check=True,
)
def delete_part_by_name(disk_file: str, name: str):
"""Delete a partition by name"""
part_num = get_part_num(disk_file, name)
delete_part(disk_file, part_num)
def new_part(disk_file: str, part_num: int, part: Partition):
"""Create a new partition"""
# does not set attr_flags - this is fine for now as current Fuchsia devices
# don't depend on it.
subprocess.run(
[
"sgdisk",
"-a",
"1",
"--new",
f"{part_num}:{part.first_lba}:{part.last_lba}",
disk_file,
],
capture_output=True,
check=True,
)
subprocess.run(
["sgdisk", "--change-name", f"{part_num}:{part.name}", disk_file],
capture_output=True,
check=True,
)
subprocess.run(
["sgdisk", "--typecode", f"{part_num}:{part.type_guid}", disk_file],
capture_output=True,
check=True,
)
# If we don't have a specific UUID to use, "R" creates a random one.
guid = part.unique_guid or "R"
subprocess.run(
["sgdisk", "--partition-guid", f"{part_num}:{guid}", disk_file],
capture_output=True,
check=True,
)
def gpt_rename_part(
disk_image: str,
old_part_name: str,
new_part_name: str,
new_part_type_guid: str = None,
):
"""Rename a partition in the GPT"""
orig_part = get_partition(disk_image, part_name=old_part_name)
new_partition = Partition(
index=orig_part.index,
name=new_part_name,
first_lba=orig_part.first_lba,
last_lba=orig_part.last_lba,
size_lba=orig_part.size_lba,
type_guid=new_part_type_guid or orig_part.type_guid,
unique_guid="", # generate new
attr_flags=orig_part.attr_flags,
)
delete_part(disk_image, orig_part.index)
new_part(disk_image, orig_part.index, new_partition)
def create_gpt(spec: Dict, out_dir: pathlib.Path):
"""Creates a GPT from the given specification.
See the file docstring for a description of the spec.
Args:
spec_path: the layout specification.
out_dir: directory to write resulting images.
Raises:
ValueError if the given spec is invalid.
"""
disk_size = spec["size_mib"] * 1024 * 1024
alignment = spec["alignment_kib"] * 1024
block_size = spec["block_size"]
# TODO: sgdisk appears to hardcode a 512-byte block size when working with
# raw image files and I can't find any option to modify this behavior. For
# now all our disks also have 512-byte blocks so it's fine, just punt on
# this until we need it.
if block_size != 512:
raise NotImplementedError(
"Only 512-byte blocks are currently implemented"
)
if disk_size % alignment != 0:
raise ValueError("Disk size must be a multiple of alignment")
if alignment % block_size != 0:
raise ValueError("Disk alignment must be a multiple of block size")
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = pathlib.Path(temp_dir)
disk_path = temp_dir / "disk.img"
with open(disk_path, "wb") as disk_file:
disk_file.truncate(disk_size)
# We generate a single big `sgdisk` command so that it only has to read
# the file once, do all the operations in memory, and then write the
# final result once.
# Initialize the disk.
command = ["sgdisk", disk_path, "--clear"]
# Partition alignment is in block units.
command += ["--set-alignment", str(alignment // block_size)]
# Add the partitions.
for i, part in enumerate(spec["partitions"], start=1):
name = part["name"]
size_kib = part["size_kib"]
if size_kib == "fill":
# "fill" means use the rest of the partition, which sgdisk does
# if you pass "0" as the end sector.
end = 0
else:
# Otherwise, use "+" notation to tell sgdisk to set the end
# relative to the partition start (i.e. partition size rather
# than end position).
end = f"+{size_kib}K"
# If a unique GUID isn't provided, "R" tells sgdisk to generate one
# randomly.
unique_guid = part.get("unique_guid", "R")
# Fetch the type GUID from the name map if it exists.
type_guid = part["type_guid"]
if type_guid in _TYPE_GUID_FROM_NAME:
type_guid = _TYPE_GUID_FROM_NAME[type_guid]
command += ["--new", f"{i}:0:{end}"]
command += ["--change-name", f"{i}:{name}"]
command += ["--typecode", f"{i}:{type_guid}"]
command += ["--partition-guid", f"{i}:{unique_guid}"]
# Run the command to write the GPT.
subprocess.run(command, check=True, capture_output=True, text=True)
# Extract the pieces to individual files.
out_dir.mkdir(exist_ok=True)
save_gpt(
disk_path,
block_size,
mbr_path=out_dir / "mbr.bin",
primary_path=out_dir / "primary.bin",
backup_path=out_dir / "backup.bin",
)
# Grab a high level summary and the final partition details for the
# README file.
summary = subprocess.run(
["sgdisk", disk_path, "--print"],
check=True,
capture_output=True,
text=True,
).stdout.strip()
partitions = get_partitions(disk_path)
# Write a README to document this GPT so it's easy for a reader to see
# where it came from and what it looks like.
(out_dir / "README.md").write_text(
textwrap.dedent(
"""\
# {name} GPT binaries
Generated by
https://cs.opensource.google/fuchsia/fuchsia/+/main:src/firmware/tools/gpt.py.
## Summary
```
{summary}
```
## Details
```
{details}
```
## Input specification
```
{spec}
```
## Command
```
{command}
```
"""
).format(
name=out_dir.name,
summary=summary,
details=json.dumps(
[dataclasses.asdict(p) for p in partitions], indent=2
),
spec=json.dumps(spec, indent=2),
command=" ".join(str(c) for c in command),
)
)
def parse_args() -> argparse.Namespace:
"""Parses commandline args."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
subparsers = parser.add_subparsers(
title="Subcommands",
dest="action",
help="Action to perform",
required=True,
)
create_parser = subparsers.add_parser(
"create", help="create GPT images from a given specification"
)
create_parser.add_argument(
"spec",
type=pathlib.Path,
help="layout specification JSON file; see --help for details",
)
create_parser.add_argument(
"out_dir",
type=pathlib.Path,
help="directory to write the resulting files to, with names:"
" [mbr.bin, primary.bin, backup.bin, README.md]",
)
return parser.parse_args()
def main():
"""Main function entry point. Raises an exception on failure."""
logging.basicConfig(level=logging.INFO)
args = parse_args()
if args.action == "create":
create_gpt(json.loads(args.spec.read_text()), args.out_dir)
else:
raise ValueError(f"Unknown action {args.action}")
if __name__ == "__main__":
try:
main()
sys.exit(0)
except KeyboardInterrupt:
# Catch this to avoid printing exception context in this case.
sys.exit(1)
except subprocess.CalledProcessError as error:
# If we failed with a subprocess that captured stdout/stderr,
# dump it out to the user so they can see what went wrong.
logging.error(
"Command failed: `%s`", " ".join(str(c) for c in error.cmd)
)
if error.stdout or error.stderr:
logging.error(
"\n" "----\n" "%s\n" "----",
(error.stdout or "") + (error.stderr or ""),
)
raise