Source code for nobodd.disk

# nobodd: a boot configuration tool for the Raspberry Pi
#
# Copyright (c) 2023-2024 Dave Jones <dave.jones@canonical.com>
# Copyright (c) 2023-2024 Canonical Ltd.
#
# SPDX-License-Identifier: GPL-3.0

import os
import mmap
import uuid
import warnings
from binascii import crc32
from collections.abc import Mapping

from . import lang
from .mbr import MBRHeader, MBRPartition
from .gpt import GPTHeader, GPTPartition


[docs] class DiskImage: """ Represents a disk image, specified by *filename_or_obj* which must be a :class:`str` or :class:`~pathlib.Path` naming the file, or a file-like object. If a file-like object is provided, it *must* have a :attr:`~io.IOBase.fileno` method which returns a valid file-descriptor number (the class uses :class:`~mmap.mmap` internally which requires a "real" file). The disk image is expected to be partitioned with either an `MBR`_ partition table or a `GPT`_. The partitions within the image can be enumerated with the :attr:`partitions` attribute. The instance can (and should) be used as a context manager; exiting the context will call the :meth:`close` method implicitly. If specified, *sector_size* is the size of sectors (in bytes) within the disk image. This defaults to 512 bytes, and should almost always be left alone. The *access* parameter controls the access used when constructing the memory mapping. This defaults to :data:`mmap.ACCESS_READ` for read-only access. If you wish to write to file-systems within the disk image, change this to :data:`mmap.ACCESS_WRITE`. You may also use :data:`mmap.ACCESS_COPY` for read-write mappings that don't actually affect the underlying disk image. .. note:: Please note that this library provides no means to re-partition disk images, just the ability to re-write files within FAT partitions. .. _MBR: https://en.wikipedia.org/wiki/Master_boot_record .. _GPT: https://en.wikipedia.org/wiki/GUID_Partition_Table """ def __init__(self, filename_or_obj, sector_size=512, access=mmap.ACCESS_READ): self._ss = sector_size if isinstance(filename_or_obj, os.PathLike): filename_or_obj = filename_or_obj.__fspath__() self._opened = isinstance(filename_or_obj, str) if self._opened: self._file = open( filename_or_obj, 'r+b' if access == mmap.ACCESS_WRITE else 'rb') else: self._file = filename_or_obj self._map = mmap.mmap(self._file.fileno(), 0, access=access) self._mem = memoryview(self._map) self._partitions = None def __repr__(self): return ( f'<{self.__class__.__name__} file={self._file!r} ' f'style={self.style!r} signature={self.signature!r}>') def __enter__(self): return self def __exit__(self, *exc): self.close()
[docs] def close(self): """ Destroys the memory mapping used on the file provided. If the file was opened by this class, it will also be closed. This method is idempotent and is implicitly called when the instance is used as a context manager. .. note:: All mappings derived from this one *must* be closed before calling this method. By far the easiest means of arranging this is to consistently use context managers with all instances derived from this. """ if self._map is not None: self._mem.release() self._map.close() if self._opened: self._file.close() self._partitions = None self._map = None self._mem = None self._file = None
@property def style(self): """ The style of partition table in use on the disk image. Will be one of the strings, 'gpt' or 'mbr'. """ return self.partitions.style @property def signature(self): """ The identifying signature of the disk. In the case of a GPT partitioned disk, this is a :class:`~uuid.UUID`. In the case of MBR, this is a 32-bit integer number. """ return self.partitions.signature @property def partitions(self): """ Provides access to the partitions in the image as a :class:`~collections.abc.Mapping` of partition number to :class:`DiskPartition` instances. .. warning:: Disk partition numbers start from 1 and need not be contiguous, or ordered. For example, it is perfectly valid to have partition 1 occur later on disk than partition 2, for partition 3 to be undefined, and partition 4 to be defined between partition 1 and 2. The partition number is essentially little more than an arbitrary key. In the case of MBR partition tables, it is particularly common to have missing partition numbers as the primary layout only permits 4 partitions. Hence, the "extended partitions" scheme numbers partitions from 5. However, if not all primary partitions are defined, there will be a "jump" from, say, partition 2 to partition 5. """ # This is a bit hacky, but reliable enough for our purposes. We check # for the "EFI PART" signature at the start of sector 1 and, if we find # it, we assume we're dealing with GPT. We don't check for a protective # or hybrid MBR because we wouldn't use it in any case. Otherwise we, # check for a valid MBR boot-signature at the appropriate offset. # Failing both of these, we raise an error. if self._partitions is None: for cls in (DiskPartitionsGPT, DiskPartitionsMBR): try: self._partitions = cls(self._mem, self._ss) except ValueError: pass else: break else: raise ValueError(lang._( 'Unable to determine partitioning scheme in use by ' '{self._file}'.format(self=self))) return self._partitions
[docs] class DiskPartition: """ Represents an individual disk partition within a :class:`DiskImage`. Instances of this class are returned as the values of the mapping provided by :attr:`DiskImage.partitions`. Instances can (and should) be used as a context manager to implicitly close references upon exiting the context. """ def __init__(self, mem, label, type): self._mem = mem self._label = label self._type = type def __repr__(self): return ( f'<{self.__class__.__name__} size={self._mem.nbytes} ' f'label={self._label!r} type={self._type!r}>') def __enter__(self): return self def __exit__(self, *exc): self.close()
[docs] def close(self): """ Release the internal :class:`memoryview` reference. This method is idempotent and is implicitly called when the instance is used as a context manager. """ self._mem.release()
@property def type(self): """ The type of the partition. For `GPT`_ partitions, this will be a :class:`uuid.UUID` instance. For `MBR`_ partitions, this will be an :class:`int`. """ return self._type @property def label(self): """ The label of the partition. `GPT`_ partitions may have a 36 character unicode label. `MBR`_ partitions do not have a label, so the string "Partition {num}" will be used instead (where *{num}* is the partition number). """ return self._label @property def data(self): """ Returns a buffer (specifically, a :class:`memoryview`) covering the contents of the partition in the owning :class:`DiskImage`. """ return self._mem
class DiskPartitions(Mapping): """ Abstract base class for the classes that handle specific partition layouts. Provides common handlers for :func:`repr` amongst other things. """ def __repr__(self): partitions = '\n'.join(f'{key}: {part!r},' for key, part in self.items()) return f'{self.__class__.__name__}({{\n{partitions}\n}})'
[docs] class DiskPartitionsGPT(DiskPartitions): """ Provides a :class:`~collections.abc.Mapping` from partition number to :class:`DiskPartition` instances for a `GPT`_. *mem* is the buffer covering the whole disk image. *sector_size* specifies the sector size of the disk image, which should almost always be left at the default of 512 bytes. """ style = 'gpt' def __init__(self, mem, sector_size=512): header = GPTHeader.from_buffer(mem, sector_size * 1) if header.signature != b'EFI PART': raise ValueError(lang._('Bad GPT signature')) if header.revision != 0x10000: raise ValueError(lang._('Unrecognized GPT version')) if header.header_size != GPTHeader._FORMAT.size: raise ValueError(lang._('Bad GPT header size')) if crc32(bytes(header._replace(header_crc32=0))) != header.header_crc32: raise ValueError(lang._('Bad GPT header CRC32')) self._mem = mem self._header = header self._ss = sector_size @property def signature(self): return uuid.UUID(bytes_le=self._header.disk_guid) def _get_table(self): start = self._header.part_table_lba table_sectors = (( (self._header.part_table_size * self._header.part_entry_size) + self._ss - 1) // self._ss) return self._mem[self._ss * start:self._ss * (start + table_sectors)] def __len__(self): with self._get_table() as table: count = 0 for offset in range(0, len(table), self._header.part_entry_size): entry = GPTPartition.from_buffer(table, offset) if entry.type_guid != b'\x00' * 16: count += 1 return count def __getitem__(self, index): if not 1 <= index <= self._header.part_table_size: raise KeyError(index) with self._get_table() as table: entry = GPTPartition.from_buffer( table, self._header.part_entry_size * (index - 1)) if entry.part_guid == b'\x00' * 16: raise KeyError(index) start = self._ss * entry.first_lba finish = self._ss * (entry.last_lba + 1) return DiskPartition( mem=self._mem[start:finish], type=uuid.UUID(bytes_le=entry.type_guid), label=entry.part_label.decode('utf-16-le').rstrip('\x00')) def __iter__(self): with self._get_table() as table: for index in range(self._header.part_table_size): entry = GPTPartition.from_buffer( table, self._header.part_entry_size * index) if entry.part_guid == b'\x00' * 16: continue yield index + 1
[docs] class DiskPartitionsMBR(DiskPartitions): """ Provides a :class:`~collections.abc.Mapping` from partition number to :class:`DiskPartition` instances for a `MBR`_. *mem* is the buffer covering the whole disk image. *sector_size* specifies the sector size of the disk image, which should almost always be left at the default of 512 bytes. """ style = 'mbr' def __init__(self, mem, sector_size=512): header = MBRHeader.from_buffer(mem, offset=0) if header.boot_sig != 0xAA55: raise ValueError(lang._('Bad MBR signature')) if header.zero != 0: raise ValueError(lang._('Bad MBR zero field')) self._mem = mem self._header = header self._ss = sector_size if len(self) == 1 and self[1].type == 0xEE: raise ValueError(lang._('Protective MBR; use GPT instead')) @property def signature(self): return self._header.disk_sig def _get_logical(self, ext_offset): logical_offset = ext_offset while True: ebr = MBRHeader.from_buffer(self._mem, logical_offset * self._ss) if ebr.boot_sig != 0xAA55: raise ValueError(lang._('Bad EBR signature')) # Yield the logical partition part = MBRPartition.from_bytes(ebr.partition_1) part = part._replace(first_lba=part.first_lba + logical_offset) yield part part = MBRPartition.from_bytes(ebr.partition_2) if part.part_type == 0x00 and part.first_lba == 0: break elif part.part_type not in (0x05, 0x0F): raise ValueError(lang._( 'Second partition in EBR at LBA {logical_offset} is not ' 'another EBR or a terminal' .format(logical_offset=logical_offset))) logical_offset = part.first_lba + ext_offset def _get_primary(self): mbr = self._header extended = False for num, buf in enumerate(mbr.partitions, start=1): part = MBRPartition.from_bytes(buf) if part.part_type in (0x05, 0x0F): if extended: warnings.warn(UserWarning(lang._( 'Multiple extended partitions found'))) extended = True yield from enumerate(self._get_logical(part.first_lba), start=5) elif part.part_type != 0x00: yield num, part def __len__(self): return sum(1 for num, part in self._get_primary()) def __getitem__(self, index): for num, part in self._get_primary(): if num == index: last_lba = part.first_lba + part.part_size return DiskPartition( mem=self._mem[self._ss * part.first_lba:self._ss * last_lba], type=part.part_type, label=f'Partition {num}') raise KeyError(index) def __iter__(self): for num, part in self._get_primary(): yield num