# nobodd: a boot configuration tool for the Raspberry Pi
#
# Copyright (c) 2023-2024 Dave Jones <dave.jones@canonical.com>
# Copyright (c) 2023-2024 Canonical Ltd.
#
# SPDX-License-Identifier: GPL-3.0
import io
import os
import re
import errno
import struct
import weakref
import warnings
import datetime as dt
from abc import abstractmethod
from collections import abc
from itertools import islice
from . import lang
from .fat import (
BIOSParameterBlock,
ExtendedBIOSParameterBlock,
FAT32BIOSParameterBlock,
FAT32InfoSector,
DirectoryEntry,
LongFilenameEntry,
lfn_valid,
lfn_checksum,
)
from .path import FatPath, get_cluster
from .tools import (
pairwise,
encode_timestamp,
any_match,
exclude,
)
[docs]
class FatWarning(Warning):
"""
Base class for warnings issued by :class:`FatFileSystem`.
"""
[docs]
class DirtyFileSystem(FatWarning):
"""
Raised when opening a FAT file-system that has the "dirty" flag set in the
second entry of the FAT.
"""
[docs]
class DamagedFileSystem(FatWarning):
"""
Raised when opening a FAT file-system that has the I/O errors flag set in
the second entry of the FAT.
"""
[docs]
class OrphanedLongFilename(FatWarning):
"""
Raised when a :class:`~nobodd.fat.LongFilenameEntry` is found with a
mismatched checksum, terminal flag, out of order index, etc. This usually
indicates an orphaned entry as the result of a non-LFN aware file-system
driver manipulating a directory.
"""
[docs]
class BadLongFilename(FatWarning):
"""
Raised when a :class:`~nobodd.fat.LongFilenameEntry` is unambiguously
corrupted, e.g. including a non-zero cluster number, in a way that would
not be caused by a non-LFN aware file-system driver.
"""
# The following references were invaluable in constructing this implementation;
# the wikipedia page on the Design of the FAT File system [1], Jonathan
# de Boyne Pollard's notes on determination of FAT widths [2], the
# Microsoft Extensible Firmware Initiative FAT32 File System Specification [3],
# and Electronic Lives Mfg.'s notes on the FAT File system [4].
#
# [1]: https://en.wikipedia.org/wiki/Design_of_the_FAT_file_system
# [2]: http://homepage.ntlworld.com/jonathan.deboynepollard/FGA/determining-fat-widths.html
# [3]: http://download.microsoft.com/download/1/6/1/161ba512-40e2-4cc9-843a-923143f3456c/fatgen103.doc
# [4]: http://elm-chan.org/docs/fat_e.html
#
# Future maintainers, please note [2] is a dead link at the time of writing;
# use archive.org to retrieve. [1] is the best starting point although it does
# attempt to drown the casual reader in detail, a lot of which can be ignored
# (I have no interest in supporting, for example, DR-DOS' DELWATCH mechanism,
# or CP/M-86's user attributes).
#
# [3] is extremely useful in some places, though you have to put up with the
# slighly condescending tone as the author argues that everyone else habitually
# gets it wrong, and Microsoft's detection algorithms are The One True Way
# (reading [2] provides a good antidote to this).
#
# Unfortunately, in other places [3] is dreadfully vague for a spec (e.g. valid
# SFN / LFN characters). Refer back to [1] for these. [4] is obviously partly
# drawn from [3], but adds some extremely important notes that others have
# omitted (or not noticed), such as the fact that volume labels can
# legitimately duplicate the name of a later file in the root directory.
[docs]
class FatFileSystem:
"""
Represents a `FAT`_ file-system, contained at the start of the buffer
object *mem*.
This class supports the FAT-12, FAT-16, and FAT-32 formats, and will
automatically determine which to use from the headers found at the start of
*mem*. The type in use may be queried from :attr:`fat_type`. Of primary use
is the :attr:`root` attribute which provides a
:class:`~nobodd.path.FatPath` instance representing the root directory of
the file-system.
Instances can (and should) be used as a context manager; exiting the
context will call the :meth:`close` method implicitly. If certain header
bits are set, :exc:`DamagedFileSystem` and :exc:`DirtyFileSystem` warnings
may be generated upon opening.
If *atime* is :data:`False`, the default, then accesses to files will *not*
update the atime field in file meta-data (when the underlying *mem* mapping
is writable). Finally, *encoding* specifies the character set used for
decoding and encoding DOS short filenames.
.. _FAT: https://en.wikipedia.org/wiki/Design_of_the_FAT_file_system
"""
def __init__(self, mem, atime=False, encoding='iso-8859-1'):
self._fat = None
self._data = None
self._root = None
mem = memoryview(mem)
try:
self._fat_type, bpb, ebpb, ebpb_fat32 = fat_type(mem)
self._atime = atime
self._encoding = encoding
# TODO: Replace with root volume label if == b'NO NAME '
self._label = ebpb.volume_label.decode(
encoding, 'replace').rstrip(' ')
total_sectors = bpb.fat16_total_sectors or bpb.fat32_total_sectors
if total_sectors == 0 and ebpb.extended_boot_sig == 0x29:
# FAT32 with >2**32 sectors uses file-system label as an 8-byte
# int
total_sectors, = struct.unpack('<Q', ebpb.file_system)
fat_size = (
bpb.sectors_per_fat if ebpb_fat32 is None else
ebpb_fat32.sectors_per_fat) * bpb.bytes_per_sector
if fat_size == 0:
raise ValueError(lang._(
'{fat_type} sectors per FAT is 0'
.format(fat_type=self._fat_type.upper())))
root_size = bpb.max_root_entries * DirectoryEntry._FORMAT.size
if root_size % bpb.bytes_per_sector:
raise ValueError(lang._(
'Max. root entries, {bpb.max_root_entries} creates a root '
'directory region that is not a multiple of sector size, '
'{bpb.bytes_per_sector}'.format(bpb=bpb)))
info_offset = (
ebpb_fat32.info_sector * bpb.bytes_per_sector
if ebpb_fat32 is not None
and ebpb_fat32.info_sector not in (0, 0xFFFF)
else None)
end_offset = total_sectors * bpb.bytes_per_sector
fat_offset = bpb.reserved_sectors * bpb.bytes_per_sector
root_offset = fat_offset + (fat_size * bpb.fat_count)
data_offset = root_offset + root_size
self._fat = {
'fat12': Fat12Table,
'fat16': Fat16Table,
'fat32': Fat32Table,
}[self._fat_type](
mem[fat_offset:root_offset], fat_size,
mem[info_offset:info_offset + bpb.bytes_per_sector]
if info_offset is not None else None)
self._data = FatClusters(
mem[data_offset:end_offset],
bpb.bytes_per_sector * bpb.sectors_per_cluster)
if self._fat_type == 'fat32':
if ebpb_fat32 is None:
raise ValueError(lang._(
'File-system claims to be FAT32 but has no FAT32 EBPB'))
self._root = ebpb_fat32.root_dir_cluster
else:
self._root = mem[root_offset:root_offset + root_size]
except:
self.close()
raise
# Check the root directory is structured as expected. Apparently some
# "non-mainstream" operating systems can use a variable-sized root
# directory on FAT-12/16, but we're not expecting to deal with any of
# those
if self._fat_type == 'fat32' and bpb.max_root_entries != 0:
raise ValueError(lang._(
'Max. root entries must be 0 for {fat_type}'
.format(fat_type=self._fat_type.upper())))
elif self._fat_type != 'fat32' and bpb.max_root_entries == 0:
raise ValueError(lang._(
'Max. root entries must be non-zero for {fat_type}'
.format(fat_type=self._fat_type.upper())))
# Check the clean and damaged bits; these are only present on FAT-16
# and FAT-32 volumes
if self._fat_type != 'fat12':
clean = (
(self._fat_type == 'fat16' and (self._fat[1] & 0x8000)) or
(self._fat_type == 'fat32' and (self._fat[1] & 0x8000000)))
errors = not (
(self._fat_type == 'fat16' and (self._fat[1] & 0x4000)) or
(self._fat_type == 'fat32' and (self._fat[1] & 0x4000000)))
if not clean:
warnings.warn(DirtyFileSystem(lang._(
'File-system has the dirty bit set')))
if errors:
warnings.warn(DamagedFileSystem(lang._(
'File-system has the I/O errors bit set')))
def __repr__(self):
return (
f'<{self.__class__.__name__} label={self.label!r} '
f'fat_type={self.fat_type!r}>')
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
[docs]
def close(self):
"""
Releases the memory references derived from the buffer the instance was
constructed with. This method is idempotent.
"""
if self._fat is not None:
self._fat.close()
self._fat = None
if self._data is not None:
self._data.close()
self._data = None
if self._root is not None:
if self._fat_type != 'fat32':
self._root.release()
self._root = None
@property
def readonly(self):
"""
Returns :data:`True` if the underlying buffer is read-only.
"""
return self._data.readonly
[docs]
def open_dir(self, cluster):
"""
Opens the sub-directory in the specified *cluster*, returning a
:class:`FatDirectory` instance representing it.
.. warning::
This method is intended for internal use by the
:class:`~nobodd.path.FatPath` class.
"""
if cluster == 0:
if self._fat_type == 'fat32':
return Fat32Root(self, self._root, self._encoding)
elif self._fat_type == 'fat16':
return Fat16Root(self._root, self._encoding)
else:
return Fat12Root(self._root, self._encoding)
else:
return FatSubDirectory(self, cluster, self._encoding)
[docs]
def open_file(self, cluster, mode='rb'):
"""
Opens the file at the specified *cluster*, returning a :class:`FatFile`
instance representing it with the specified *mode*. Note that the
:class:`FatFile` instance returned by this method has no directory
entry associated with it.
.. warning::
This method is intended for internal use by the
:class:`~nobodd.path.FatPath` class, specifically for "files"
underlying the sub-directory structure which do not have an
associated size (other than that dictated by their FAT chain of
clusters).
"""
return FatFile.from_cluster(self, cluster, mode)
[docs]
def open_entry(self, index, entry, mode='rb'):
"""
Opens the specified *entry*, which must be a
:class:`~nobodd.fat.DirectoryEntry` instance, which must be a member of
*index*, an instance of :class:`FatDirectory`. Returns a
:class:`FatFile` instance associated with the specified *entry*. This
permits writes to the file to be properly recorded in the corresponding
directory entry.
.. warning::
This method is intended for internal use by the
:class:`~nobodd.path.FatPath` class.
"""
return FatFile.from_entry(self, index, entry, mode)
@property
def fat(self):
"""
A :class:`FatTable` sequence representing the FAT table itself.
.. warning::
This attribute is intended for internal use by the :class:`FatFile`
class, but may be useful for low-level exploration or manipulation
of FAT file-systems.
"""
return self._fat
@property
def clusters(self):
"""
A :class:`FatClusters` sequence representing the clusters containing
the data stored in the file-system.
.. warning::
This attribute is intended for internal use by the :class:`FatFile`
class, but may be useful for low-level exploration or manipulation
of FAT file-systems.
"""
return self._data
@property
def fat_type(self):
"""
Returns a :class:`str` indicating the type of `FAT`_ file-system
present. Returns one of "fat12", "fat16", or "fat32".
"""
return self._fat_type
@property
def label(self):
"""
Returns the label from the header of the file-system. This is an ASCII
string up to 11 characters long.
"""
return self._label
@property
def sfn_encoding(self):
"""
The encoding used for short (8.3) filenames. This defaults to
"iso-8859-1" but unfortunately there's no way of determining the
correct codepage for these.
"""
return self._encoding
@property
def atime(self):
"""
If the underlying mapping is writable, then atime (last access time)
will be updated upon reading the content of files, when this property
is :data:`True` (the default is :data:`False`).
"""
return self._atime
@property
def root(self):
"""
Returns a :class:`~nobodd.path.FatPath` instance (a
:class:`~pathlib.Path`-like object) representing the root directory of
the FAT file-system. For example::
from nobodd.disk import DiskImage
from nobodd.fs import FatFileSystem
with DiskImage('test.img') as img:
with FatFileSystem(img.partitions[1].data) as fs:
print('ls /')
for p in fs.root.iterdir():
print(p.name)
.. note::
This is intended to be the primary entry-point for querying and
manipulating the file-system at the high level. Only use the
:attr:`fat` and :attr:`clusters` attributes, and the various "open"
methods if you want to explore or manipulate the file-system at a
low level.
"""
return FatPath._from_index(self, self.open_dir(0))
[docs]
def fat_type(mem):
"""
Given a `FAT`_ file-system at the start of the buffer *mem*, determine its
type, and decode its headers. Returns a four-tuple containing:
* one of the strings "fat12", "fat16", or "fat32"
* a :class:`~nobodd.fat.BIOSParameterBlock` instance
* a :class:`~nobodd.fat.ExtendedBIOSParameterBlock` instance
* a :class:`~nobodd.fat.FAT32BIOSParameterBlock`, if one is present, or
:data:`None` otherwise
"""
fat_types = {
b'FAT ': None,
b'FAT12 ': 'fat12',
b'FAT16 ': 'fat16',
b'FAT32 ': 'fat32',
}
bpb = BIOSParameterBlock.from_buffer(mem)
ebpb = ExtendedBIOSParameterBlock.from_buffer(
mem, BIOSParameterBlock._FORMAT.size)
try:
fat_type = fat_types[ebpb.file_system]
if fat_type is not None:
return fat_type, bpb, ebpb, None
except KeyError:
pass
if ebpb.extended_boot_sig in (0x28, 0x29):
fat_type = fat_type_from_count(bpb, ebpb, None)
return fat_type, bpb, ebpb, None
ebpb_fat32 = FAT32BIOSParameterBlock.from_buffer(
mem, BIOSParameterBlock._FORMAT.size)
ebpb = ExtendedBIOSParameterBlock.from_buffer(
mem, BIOSParameterBlock._FORMAT.size +
FAT32BIOSParameterBlock._FORMAT.size)
try:
fat_type = fat_types[ebpb.file_system]
if fat_type is not None:
return fat_type, bpb, ebpb, ebpb_fat32
except KeyError:
pass
if ebpb.extended_boot_sig in (0x28, 0x29):
fat_type = fat_type_from_count(bpb, ebpb, ebpb_fat32)
return fat_type, bpb, ebpb, ebpb_fat32
raise ValueError(lang._(
'Could not find FAT file-system type or extended boot signature'))
[docs]
def fat_type_from_count(bpb, ebpb, ebpb_fat32):
"""
Derives the type of the `FAT`_ file-system when it cannot be determined
directly from the *bpb* and *ebpb* headers (the
:class:`~nobodd.fat.BIOSParameterBlock`, and
:class:`~nobodd.fat.ExtendedBIOSParameterBlock` respectively).
Uses `known limits`_ on the number of clusters to derive the type of FAT in
use. Returns one of the strings "fat12", "fat16", or "fat32".
.. _known limits:
https://en.wikipedia.org/wiki/Design_of_the_FAT_file_system#Size_limits
"""
total_sectors = bpb.fat16_total_sectors or bpb.fat32_total_sectors
if total_sectors == 0 and ebpb.extended_boot_sig == 0x29:
# FAT32 with >2**32 sectors uses file-system label as an 8-byte int
total_sectors, = struct.unpack('<Q', ebpb.file_system)
fat_sectors = (
bpb.fat_count *
(bpb.sectors_per_fat if ebpb_fat32 is None else
ebpb_fat32.sectors_per_fat))
root_sectors = (
(bpb.max_root_entries * DirectoryEntry._FORMAT.size) +
(bpb.bytes_per_sector - 1)) // bpb.bytes_per_sector
data_offset = bpb.reserved_sectors + fat_sectors + root_sectors
data_clusters = (total_sectors - data_offset) // bpb.sectors_per_cluster
return (
'fat12' if data_clusters < 4085 else
'fat16' if data_clusters < 65525 else
'fat32')
[docs]
class FatTable(abc.MutableSequence):
"""
Abstract :class:`~collections.abc.MutableSequence` class representing the
FAT table itself.
This is the basis for :class:`Fat12Table`, :class:`Fat16Table`, and
:class:`Fat32Table`. While all the implementations are potentially mutable
(if the underlying memory mapping is writable), only direct replacement of
FAT entries is valid. Insertion and deletion will raise :exc:`TypeError`.
A concrete class is constructed by :class:`FatFileSystem` (based on the
type of FAT format found). The :meth:`chain` method is used by
:class:`FatFile` (and indirectly :class:`FatSubDirectory`) to discover the
chain of clusters that make up a file (or sub-directory). The :meth:`free`
method is used by writable :class:`FatFile` instances to find the next free
cluster to write to. The :meth:`mark_free` and :meth:`mark_end` methods are
used to mark a clusters as being free or as the terminal cluster of a
file.
"""
min_valid = None
max_valid = None
end_mark = None
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
def close(self):
if self._tables:
for table in self._tables:
table.release()
self._tables = ()
def __len__(self):
return len(self._tables[0])
def __delitem__(self, cluster):
raise TypeError(lang._('FAT length is immutable'))
@property
def readonly(self):
return self._tables[0].readonly
[docs]
@abstractmethod
def get_all(self, cluster):
"""
Returns the value of *cluster* in all copies of the FAT, as a
:class:`tuple` (naturally, under normal circumstances, these should all
be equal).
"""
raise NotImplementedError
[docs]
def insert(self, cluster, value):
"""
Raises :exc:`TypeError`; the FAT length is immutable.
"""
raise TypeError(lang._('FAT length is immutable'))
[docs]
def mark_free(self, cluster):
"""
Marks *cluster* as free (this simply sets *cluster* to 0 in the FAT).
"""
self[cluster] = 0
[docs]
def mark_end(self, cluster):
"""
Marks *cluster* as the end of a chain. The value used to indicate the
end of a chain is specific to the FAT size.
"""
self[cluster] = self.end_mark
[docs]
def chain(self, start):
"""
Generator method which yields all the clusters in the chain starting at
*start*.
"""
cluster = start
while self.min_valid <= cluster <= self.max_valid:
yield cluster
cluster = self[cluster]
[docs]
def free(self):
"""
Generator that scans the FAT for free clusters, yielding each as it is
found. Iterating to the end of this generator raises :exc:`OSError`
with the code ENOSPC (out of space).
"""
for cluster, value in enumerate(self):
if value == 0 and self.min_valid < cluster:
yield cluster
if cluster >= self.max_valid:
break
# If we reach this point without the caller having broken out of their
# loop, we've run out of space so raise the appropriate exception
raise OSError(errno.ENOSPC, os.strerror(errno.ENOSPC))
[docs]
class Fat12Table(FatTable):
"""
Concrete child of :class:`FatTable` for FAT-12 file-systems.
.. autoattribute:: min_valid
.. autoattribute:: max_valid
.. autoattribute:: end_mark
"""
min_valid = 0x002
max_valid = 0xFEF
end_mark = 0xFFF
def __init__(self, mem, fat_size, info_mem=None):
super().__init__()
assert info_mem is None
self._tables = tuple(
mem[offset:offset + fat_size]
for offset in range(0, len(mem), fat_size)
)
def __len__(self):
return (super().__len__() * 2) // 3
[docs]
def get_all(self, cluster):
try:
offset = cluster + (cluster >> 1)
if cluster % 2:
return tuple(
struct.unpack_from('<H', t, offset)[0] >> 4
for t in self._tables
)
else:
return tuple(
struct.unpack_from('<H', t, offset)[0] & 0x0FFF
for t in self._tables
)
except struct.error:
raise IndexError(lang._(
'{offset} out of bounds'.format(offset=offset)))
def __getitem__(self, cluster):
try:
offset = cluster + (cluster >> 1)
if cluster % 2:
return struct.unpack_from(
'<H', self._tables[0], offset)[0] >> 4
else:
return struct.unpack_from(
'<H', self._tables[0], offset)[0] & 0x0FFF
except struct.error:
raise IndexError(lang._(
'{offset} out of bounds'.format(offset=offset)))
def __setitem__(self, cluster, value):
if not 0x000 <= value <= 0xFFF:
raise ValueError(lang._(
'{value} is outside range 0x000..0xFFF'.format(value=value)))
try:
offset = cluster + (cluster >> 1)
if cluster % 2:
value <<= 4
value |= struct.unpack_from(
'<H', self._tables[0], offset)[0] & 0x000F
else:
value |= struct.unpack_from(
'<H', self._tables[0], offset)[0] & 0xF000
for table in self._tables:
struct.pack_into('<H', table, offset, value)
except struct.error:
raise IndexError(lang._(
'{offset} out of bounds'.format(offset=offset)))
[docs]
class Fat16Table(FatTable):
"""
Concrete child of :class:`FatTable` for FAT-16 file-systems.
.. autoattribute:: min_valid
.. autoattribute:: max_valid
.. autoattribute:: end_mark
"""
min_valid = 0x0002
max_valid = 0xFFEF
end_mark = 0xFFFF
def __init__(self, mem, fat_size, info_mem=None):
super().__init__()
assert info_mem is None
self._tables = tuple(
mem[offset:offset + fat_size].cast('H')
for offset in range(0, len(mem), fat_size)
)
[docs]
def get_all(self, cluster):
return tuple(t[cluster] for t in self._tables)
def __getitem__(self, cluster):
return self._tables[0][cluster]
def __setitem__(self, cluster, value):
if not 0x0000 <= value <= 0xFFFF:
raise ValueError(lang._(
'{value} is outside range 0x0000..0xFFFF'.format(value=value)))
for table in self._tables:
table[cluster] = value
[docs]
class Fat32Table(FatTable):
"""
Concrete child of :class:`FatTable` for FAT-32 file-systems.
.. autoattribute:: min_valid
.. autoattribute:: max_valid
.. autoattribute:: end_mark
"""
min_valid = 0x00000002
max_valid = 0x0FFFFFEF
end_mark = 0x0FFFFFFF
def __init__(self, mem, fat_size, info_mem=None):
super().__init__()
self._tables = tuple(
mem[offset:offset + fat_size].cast('I')
for offset in range(0, len(mem), fat_size)
)
self._info = None
self._info_mem = None
if info_mem is not None:
info = FAT32InfoSector.from_buffer(info_mem)
if (
info.sig1 == b'RRaA' and
info.sig2 == b'rrAa' and
info.sig3 == b'\0\0\x55\xAA'):
self._info = info
self._info_mem = info_mem
def close(self):
super().close()
if self._info_mem is not None:
self._info_mem.release()
self._info_mem = None
self._info = None
def _alloc(self, cluster):
if self._info is not None:
if 0 < self._info.free_clusters <= len(self):
self._info = self._info._replace(
free_clusters=self._info.free_clusters - 1,
last_alloc=cluster)
self._info.to_buffer(self._info_mem)
def _dealloc(self, cluster):
if self._info is not None:
if 0 <= self._info.free_clusters < len(self):
self._info = self._info._replace(
free_clusters=self._info.free_clusters + 1)
self._info.to_buffer(self._info_mem)
[docs]
def free(self):
if self._info is not None:
last_alloc = self._info.last_alloc
if self.min_valid <= last_alloc < len(self):
# If we have a valid info-sector, start scanning from the last
# allocated cluster plus one
for cluster in range(last_alloc + 1, len(self)):
if self[cluster] == 0 and self.min_valid < cluster:
yield cluster
if cluster >= self.max_valid:
break
yield from super().free()
[docs]
def get_all(self, cluster):
return tuple(t[cluster] & 0x0FFFFFFF for t in self._tables)
def __getitem__(self, cluster):
return self._tables[0][cluster] & 0x0FFFFFFF
def __setitem__(self, cluster, value):
if not 0x00000000 <= value <= 0x0FFFFFFF:
raise ValueError(lang._(
'{value} is outside range 0x00000000..0x0FFFFFFF'
.format(value=value)))
old_value = self._tables[0][cluster]
if not old_value and value:
self._alloc(cluster)
elif old_value and not value:
self._dealloc(cluster)
for table in self._tables:
table[cluster] = (old_value & 0xF0000000) | (value & 0x0FFFFFFF)
[docs]
class FatClusters(abc.MutableSequence):
"""
:class:`~collections.abc.MutableSequence` representing the clusters of
the file-system itself.
While the sequence is mutable, clusters cannot be deleted or inserted, only
read and (if the underlying buffer is writable) re-written.
"""
def __init__(self, mem, cluster_size):
self._mem = mem
self._cs = cluster_size
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
def close(self):
if self._mem is not None:
self._mem.release()
self._mem = None
@property
def size(self):
"""
Returns the size (in bytes) of clusters in the file-system.
"""
return self._cs
@property
def readonly(self):
"""
Returns :data:`True` if the underlying buffer is read-only.
"""
return self._mem.readonly
def __len__(self):
return len(self._mem) // self._cs
def __getitem__(self, cluster):
# The first data cluster is numbered 2, hence the offset below.
# Clusters 0 and 1 are special and don't exist in the data portion of
# the file-system
if not 2 <= cluster < len(self) + 2:
raise IndexError(cluster)
offset = (cluster - 2) * self._cs
return self._mem[offset:offset + self._cs]
def __setitem__(self, cluster, value):
# See above
if not 2 <= cluster < len(self) + 2:
raise IndexError(cluster)
offset = (cluster - 2) * self._cs
self._mem[offset:offset + self._cs] = value
def __delitem__(self, cluster):
raise TypeError(lang._('FS length is immutable'))
[docs]
def insert(self, cluster, value):
"""
Raises :exc:`TypeError`; the FS length is immutable.
"""
raise TypeError(lang._('FS length is immutable'))
[docs]
class FatDirectory(abc.MutableMapping):
"""
An abstract :class:`~collections.abc.MutableMapping` representing a `FAT
directory`_. The mapping is ostensibly from filename to
:class:`~nobodd.fat.DirectoryEntry` instances, but there are several
oddities to be aware of.
In VFAT, many files effectively have *two* filenames: the original DOS
"short" filename (SFN hereafter) and the VFAT "long" filename (LFN
hereafter). All files have an SFN; any file may optionally have an LFN. The
SFN is stored in the :class:`~nobodd.fat.DirectoryEntry` which records
details of the file (mode, size, cluster, etc). The optional LFN is stored
in leading :class:`~nobodd.fat.LongFilenameEntry` records.
Even when :class:`~nobodd.fat.LongFilenameEntry` records do *not* precede a
:class:`~nobodd.fat.DirectoryEntry`, the file may still have an LFN that
differs from the SFN in case only, recorded by flags in the
:class:`~nobodd.fat.DirectoryEntry`. Naturally, some files still only have
one filename because the LFN doesn't vary in case from the SFN, e.g. the
special directory entries "." and "..", and anything which conforms to
original DOS naming rules like "README.TXT".
For the purposes of listing files, most FAT implementations (including this
one) ignore the SFNs. Hence, iterating over this mapping will *not* yield
the SFNs as keys (unless the SFN is equal to the LFN), and they are *not*
counted in the length of the mapping. However, for the purposes of testing
existence, opening, etc., FAT implementations allow the use of SFNs. Hence,
testing for membership, or manipulating entries via the SFN will work with
this mapping, and will implicitly manipulate the associated LFNs (e.g.
deleting an entry via a SFN key will also delete the associated LFN key).
In other words, if a file has a distinct LFN and SFN, it has *two* entries
in the mapping (a "visible" LFN entry, and an "invisible" SFN entry).
Further, note that FAT is case retentive (for LFNs; SFNs are folded
uppercase), but not case sensitive. Hence, membership tests and retrieval
from this mapping are case insensitive with regard to keys.
Finally, note that the values in the mapping are always instances of
:class:`~nobodd.fat.DirectoryEntry`. :class:`~nobodd.fat.LongFilenameEntry`
instances are neither accepted nor returned; these are managed internally.
.. _FAT directory:
https://en.wikipedia.org/wiki/Design_of_the_FAT_file_system#Directory_table
.. autoattribute:: MAX_SFN_SUFFIX
"""
MAX_SFN_SUFFIX = 0xFFFF
SFN_VALID = re.compile(b"[^A-Z0-9 !#$%&'()@^_`{}~\x80-\xFF-]")
__slots__ = ('_encoding',)
@abstractmethod
def _get_cluster(self):
raise NotImplementedError
[docs]
@abstractmethod
def _iter_entries(self):
"""
Abstract generator that is expected to yield successive offsets and the
entries at those offsets as :class:`~nobodd.fat.DirectoryEntry`
instances or :class:`~nobodd.fat.LongFilenameEntry` instances, as
appropriate.
All instances must be yielded, in the order they appear on disk,
regardless of whether they represent deleted, orphaned, corrupted,
terminal, or post-terminal entries.
"""
raise NotImplementedError
[docs]
@abstractmethod
def _update_entry(self, offset, entry):
"""
Abstract method which is expected to (re-)write *entry* (a
:class:`~nobodd.fat.DirectoryEntry` or
:class:`~nobodd.fat.LongFilenameEntry` instance) at the specified
*offset* in the directory.
"""
raise NotImplementedError
[docs]
def _split_entries(self, entries):
"""
Given *entries*, a sequence of :class:`~nobodd.fat.LongFilenameEntry`
instances, ending with a single :class:`~nobodd.fat.DirectoryEntry` (as
would typically be found in a FAT directory index), return the decoded
long filename, short filename, and the directory entry record as a
3-tuple.
If no long filename entries are present, the long filename will be
equal to the short filename (but may have lower-case parts).
.. note::
This function also carries out several checks, including the
filename checksum, that all checksums match, that the number of
entries is valid, etc. Any violations found may raise warnings
including :exc:`OrphanedLongFilename` and :exc:`BadLongFilename`.
"""
# The extration of the long filename could be simpler, but let's do all
# the checks we can (the structure includes a *lot* of redundancy for
# checking things!)
assert entries
*lfn_entries, entry = entries
assert isinstance(entry, DirectoryEntry)
checksum = lfn_checksum(entry.filename, entry.ext)
lfn = self._join_lfn_entries(lfn_entries, checksum)
if lfn is not None:
lfn = lfn.decode('utf-16le').rstrip('\uffff')
# There may be one trailing NUL char, but there may not if the
# filename fits perfectly in a LFN structure
if lfn[-1:] == '\x00':
lfn = lfn[:-1]
if not lfn:
warnings.warn(BadLongFilename(lang._(
'empty LongFilenameEntry decoded')))
lfn = None
sfn = entry.filename.rstrip(b' ')
# If initial char of the filename is 0xE5 (which is reserved to
# indicate a deleted entry) then it's encoded as 0x05 (since DOS 3.0)
if sfn[0] == 0x05:
sfn = b'\xE5' + sfn[1:]
sfn = sfn.decode(self._encoding)
ext = entry.ext.rstrip(b' ').decode(self._encoding)
# Bits 3 & 4 of attr2 are used by Windows NT (basically any modern
# Windows) to indicate if the short filename (in the absence of long
# filename entries) has upper / lower-case portions
if lfn is None:
lfn = sfn.lower() if entry.attr2 & 0b1000 else sfn
if ext:
lfn = lfn + '.' + (ext.lower() if entry.attr2 & 0b10000 else ext)
if ext:
sfn = sfn + '.' + ext
return lfn, sfn, entry
[docs]
def _join_lfn_entries(self, entries, checksum, sequence=0, lfn=b''):
"""
Given *entries*, a sequence of :class:`~nobodd.fat.LongFilenameEntry`
instances, decode the long filename encoded within them, ensuring that
all the invariants (sequence number, checksums, terminal flag, etc.)
are obeyed.
Returns the decoded (:class:`str`) long filename, or :data:`None` if
no valid long filename can be found. Emits various warnings if
invalid entries are encountered during decoding, including
:exc:`OrphanedLongFilename` and :exc:`BadLongFilename`.
"""
if not entries:
return None
head, *entries = entries
if head.first_cluster != 0:
warnings.warn(BadLongFilename(lang._(
'LongFilenameEntry.first_cluster is non-zero: '
'{head.first_cluster}'.format(head=head))))
return self._join_lfn_entries(entries, checksum)
if head.checksum != checksum:
warnings.warn(OrphanedLongFilename(lang._(
'mismatched LongFilenameEntry.checksum: {checksum} != '
'{head.checksum}'.format(checksum=checksum, head=head))))
return self._join_lfn_entries(entries, checksum)
if head.sequence & 0x40:
if lfn:
# NOTE: Add the new terminal back onto the list to be
# processed. All other failures (below) don't need to do this
# because they're definitely non-terminal and thus can't start
# a valid LongFilenameEntry run
warnings.warn(OrphanedLongFilename(lang._(
'new terminal LongFilenameEntry')))
return self._join_lfn_entries([head] + entries, checksum)
sequence = head.sequence & 0b11111
if not sequence:
warnings.warn(BadLongFilename(lang._(
'LongFilenameEntry.sequence is zero')))
return self._join_lfn_entries(entries, checksum)
elif head.sequence != sequence:
warnings.warn(OrphanedLongFilename(lang._(
'unexpected LongFilenameEntry.sequence: {sequence} != '
'{head.sequence}'.format(sequence=sequence, head=head))))
return self._join_lfn_entries(entries, checksum)
lfn = head.name_1 + head.name_2 + head.name_3 + lfn
if sequence == 1:
if entries:
warnings.warn(OrphanedLongFilename(lang._(
'more LongFilenameEntry after sequence: 1')))
return self._join_lfn_entries(entries, checksum)
return lfn
else:
if not entries:
warnings.warn(OrphanedLongFilename(lang._(
'missing LongFilenameEntry after sequence: {sequence}'
.format(sequence=sequence))))
return self._join_lfn_entries(entries, checksum, sequence - 1, lfn)
[docs]
def _prefix_entries(self, filename, entry):
"""
Given *entry*, a :class:`~nobodd.fat.DirectoryEntry`, generate the
necessary :class:`~nobodd.fat.LongFilenameEntry` instances (if any),
that are necessary to associate *entry* with the specified *filename*.
This function merely constructs the instances, ensuring the (many,
convoluted!) rules are followed, including that the short filename, if
one is generated, is unique in this directory, and the long filename is
encoded and check-summed appropriately.
.. note::
The *filename* and *ext* fields of *entry* are ignored by this
method. The only filename that is considered is the one explicitly
passed in which becomes the basis for the long filename entries
*and* the short filename stored within the *entry* itself.
The return value is the sequence of long filename entries and the
modified directory entry in the order they should appear on disk.
"""
lfn, sfn, ext, attr2 = self._get_names(filename)
if lfn:
checksum = lfn_checksum(sfn, ext)
entries = [
LongFilenameEntry(
sequence=part,
name_1=lfn[offset:offset + 10],
attr=0xF,
checksum=checksum,
name_2=lfn[offset + 10:offset + 22],
first_cluster=0,
name_3=lfn[offset + 22:offset + 26]
)
for part, offset
in enumerate(range(0, len(lfn), 26), start=1)
]
entries.reverse()
# Add terminal marker to "last" entry
entries[0] = entries[0]._replace(
sequence=0x40 | entries[0].sequence)
else:
entries = []
entries.append(entry._replace(filename=sfn, ext=ext, attr2=attr2))
return entries
[docs]
def _get_names(self, filename):
"""
Given a *filename*, generate an appropriately encoded long filename
(encoded in little-endian UCS-2), short filename (encoded in the
file-system's SFN encoding), extension, and the case attributes. The
result is a 4-tuple: ``lfn, sfn, ext, attr``.
``lfn``, ``sfn``, and ``ext`` will be :class:`bytes` strings, and
``attr`` will be an :class:`int`. If *filename* is capable of being
represented as a short filename only (potentially with non-zero case
attributes), ``lfn`` in the result will be zero-length.
"""
# sfn == short filename, lfn == long filename, ext == extension
if filename in ('.', '..'):
sfn, ext = filename.encode(self._encoding), b''
else:
sfn = filename.lstrip('.').upper().encode(self._encoding, 'replace')
sfn = sfn.replace(b' ', b'')
if b'.' in sfn:
sfn, ext = sfn.rsplit(b'.', 1)
else:
sfn, ext = sfn, b''
sfn = self.SFN_VALID.sub(b'_', sfn)
ext = self.SFN_VALID.sub(b'_', ext)
if len(sfn) <= 8 and len(ext) <= 3:
# NOTE: Huh, a place where match..case might actually be
# useful! Why isn't this a dict? It was originally, but in
# purely symbolic cases (e.g. "." and "..") the transformed SFN
# can be equivalent in all cases and we want to explicitly prefer
# the case where attr is 0.
sfn_only = True
lfn = filename.encode(self._encoding, 'replace')
make_sfn = lambda s, e: (s + b'.' + e) if e else s
if lfn == make_sfn(sfn, ext):
attr = 0
elif lfn == make_sfn(sfn, ext.lower()):
attr = 0b10000
elif lfn == make_sfn(sfn.lower(), ext):
attr = 0b01000
elif lfn == make_sfn(sfn.lower(), ext.lower()):
attr = 0b11000
else:
sfn_only = False
attr = 0
else:
sfn_only = False
attr = 0
if sfn_only:
lfn = b''
else:
lfn = filename.encode('utf-16le')
if len(lfn) > 255 * 2:
raise ValueError(lang._(
'{filename} is too long (more than 255 UCS-2 characters)'
.format(filename=filename)))
# NUL terminate if len(result) mod 26
# result fits perfectly in a LongFilenameEntry)
if len(lfn) % 26:
lfn += b'\0\0'
if len(lfn) % 26:
pad = ((len(lfn) + 25) // 26) * 26
lfn = lfn.ljust(pad, b'\xff')
assert len(lfn) % 26 == 0
ext = ext[:3]
sfn = self._get_unique_sfn(
sfn.decode(self._encoding),
ext.decode(self._encoding)).encode(self._encoding, 'replace')
sfn = sfn.ljust(8, b' ')
ext = ext.ljust(3, b' ')
return lfn, sfn, ext, attr
[docs]
def _get_unique_sfn(self, prefix, ext):
"""
Given *prefix* and *ext*, which are :class:`str`, of the short filename
prefix and extension, find a suffix that is unique in the directory
(amongst both long *and* short filenames, because these are still in
the same namespace).
For example, in a directory containing ``default.config`` (which has
shortname ``DEFAUL~1.CON``), given the filename and extension
``default.conf``, this function will return the :class:`str`
``DEFAUL~2.CON``.
Because the search requires enumeration of the whole directory, which
is expensive, an artificial limit of :data:`MAX_SFN_SUFFIX` is
enforced. If this is reached, the search will terminate with an
:exc:`OSError` with code ENOSPC (out of space).
"""
ranges = [range(1, self.MAX_SFN_SUFFIX)]
regexes = [
re.compile(
f'{re.escape(prefix[:7 - i])}~([0-9]{{{i}}})\\.{re.escape(ext)}'
if ext else
f'{re.escape(prefix[:7 - i])}~([0-9]{{{i}}})',
re.IGNORECASE)
for i in range(1, len(str(self.MAX_SFN_SUFFIX)) + 1)
]
for offset, entries in self._group_entries():
lfn, sfn, entry = self._split_entries(entries)
m = any_match(sfn, regexes)
if m:
exclude(ranges, int(m.group(1)))
m = any_match(lfn, regexes)
if m:
exclude(ranges, int(m.group(1)))
for r in ranges:
return f'{prefix[:7 - len(str(r.start))]}~{r.start}'
# We cannot create any shortnames that aren't already taken. Given the
# limit on entries in a dir (MAX_SFN_SUFFIX, roughly) report ENOSPC
raise OSError(errno.ENOSPC, os.strerror(errno.ENOSPC))
[docs]
def _group_entries(self):
"""
Generator which yields an offset, and a sequence of either
:class:`~nobodd.fat.LongFilenameEntry` and
:class:`~nobodd.fat.DirectoryEntry` instances.
Each tuple yielded represents a single (extant, non-deleted) file or
directory with its long-filename entries at the start, and the
directory entry as the final element. The offset associated with the
sequence is the offset of the *directory entry* (not its preceding long
filename entries). In other words, for a file with three long-filename
entries, the following might be yielded::
(160, [
<LongFilenameEntry>),
<LongFilenameEntry>),
<LongFilenameEntry>),
<DirectoryEntry>)
])
This indicates that the directory entry is at offset 160, preceded by
long filename entries at offsets 128, 96, and 64.
"""
entries = []
for offset, entry in self._iter_entries():
if isinstance(entry, LongFilenameEntry):
if entry.sequence == 0xE5: # deleted entry
continue
entries.append(entry)
if isinstance(entry, DirectoryEntry):
if entry.filename[0] == 0: # end of valid entries
break
elif entry.attr & 0x8: # volume label
pass
elif entry.filename[0] != 0xE5: # deleted entry
yield offset, entries
entries = []
[docs]
def _clean_entries(self):
"""
Find and remove all deleted entries from the directory.
The method scans the directory for all directory entries and long
filename entries which start with 0xE5, indicating a deleted entry,
and overwrites them with later (not deleted) entries. Trailing entries
are then zeroed out. The return value is the new offset of the terminal
entry.
"""
write_offset = 0
for read_offset, entry in self._iter_entries():
if isinstance(entry, DirectoryEntry):
if entry.filename[0] == 0: # end of valid entries
break
elif entry.filename[0] == 0xE5: # deleted entry
continue
if isinstance(entry, LongFilenameEntry):
if entry.sequence == 0xE5: # deleted entry
continue
if read_offset > write_offset:
self._update_entry(write_offset, entry)
write_offset += DirectoryEntry._FORMAT.size
else:
# If we exit the loop without a break, the source has no EOF marker
# which is strictly invalid; advance the read_offset to force one
read_offset += DirectoryEntry._FORMAT.size
eof = write_offset
empty = DirectoryEntry.eof()
while write_offset < read_offset:
self._update_entry(write_offset, empty)
write_offset += DirectoryEntry._FORMAT.size
return eof
def __len__(self):
return sum(1 for lfn in self)
def __iter__(self):
for offset, entries in self._group_entries():
lfn, sfn, entry = self._split_entries(entries)
yield lfn
[docs]
def items(self):
# NOTE: Overridden to avoid quadratic behaviour of inherited method
for offset, entries in self._group_entries():
lfn, sfn, entry = self._split_entries(entries)
yield lfn, entry
[docs]
def values(self):
# NOTE: Overridden to avoid quadratic behaviour of inherited method
for offset, entries in self._group_entries():
lfn, sfn, entry = self._split_entries(entries)
yield entry
def __contains__(self, name):
uname = name.upper()
for offset, entries in self._group_entries():
lfn, sfn, entry = self._split_entries(entries)
if lfn.upper() == uname or sfn == name:
return True
return False
def __getitem__(self, name):
uname = name.upper()
for offset, entries in self._group_entries():
lfn, sfn, entry = self._split_entries(entries)
if lfn.upper() == uname or sfn == uname:
return entry
raise KeyError(name)
def __setitem__(self, name, entry):
# NOTE: For the purposes of setting entries, the filename and ext
# within *entry* are ignored. For new entries, these will be generated
# from *name*. For existing entries, the existing values will be
# re-used
uname = name.upper()
offset = -DirectoryEntry._FORMAT.size
for offset, entries in self._group_entries():
lfn, sfn, old_entry = self._split_entries(entries)
if lfn.upper() == uname or sfn == uname:
self._update_entry(offset, entry._replace(
filename=old_entry.filename, ext=old_entry.ext))
return
# This isn't *necessarily* the actual EOF. It could be orphaned or
# deleted entries that _group_entries isn't yielding, but that doesn't
# matter for our purposes. All that matters is that we can safely
# overwrite these entries
eof_offset = offset + DirectoryEntry._FORMAT.size
entries = self._prefix_entries(name, entry)
entries.append(DirectoryEntry.eof())
for cleaned in (False, True):
# We write the entries in reverse order to make it more likely that
# anything scanning the directory simultaneously sees the append as
# "atomic" (because the last item written overwrites the old
# terminal marker entry)
offsets = range(
eof_offset,
eof_offset + len(entries) * DirectoryEntry._FORMAT.size,
DirectoryEntry._FORMAT.size)
try:
for offset, entry in reversed(list(zip(offsets, entries))):
self._update_entry(offset, entry)
except OSError as e:
# If the directory structure runs out of space (which is more
# likely under FAT-12 and FAT-16 where the root directory is
# fixed in size), then all deleted entries will be expunged,
# and the method will attempt to append the new entries once
# more
if e.errno == errno.ENOSPC and not cleaned:
eof_offset = self._clean_entries()
else:
raise
else:
return
def __delitem__(self, name):
uname = name.upper()
for offset, entries in self._group_entries():
lfn, sfn, entry = self._split_entries(entries)
if lfn.upper() == uname or sfn == uname:
# NOTE: We update the DirectoryEntry first then work backwards,
# marking the long filename entries. This ensures anything
# simultaneously scanning the directory shouldn't find a "live"
# directory entry preceded by "dead" long filenames
for entry in reversed(entries):
if isinstance(entry, DirectoryEntry):
self._update_entry(offset, entry._replace(
filename=b'\xE5' + entry.filename[1:]))
else: # LongFilenameEntry
self._update_entry(offset, entry._replace(
sequence=0xE5))
offset -= DirectoryEntry._FORMAT.size
return
raise KeyError(name)
cluster = property(lambda self: self._get_cluster())
[docs]
class FatRoot(FatDirectory):
"""
An abstract derivative of :class:`FatDirectory` representing the
(fixed-size) root directory of a FAT-12 or FAT-16 file-system. Must be
constructed with *mem*, which is a buffer object covering the root
directory clusters, and *encoding*, which is taken from
:attr:`FatFileSystem.sfn_encoding`. The :class:`Fat12Root` and
:class:`Fat16Root` classes are (trivial) concrete derivatives of this.
"""
__slots__ = ('_mem',)
def __init__(self, mem, encoding):
self._encoding = encoding
self._mem = mem
def _get_cluster(self):
return 0
def _update_entry(self, offset, entry):
if offset >= len(self._mem):
raise OSError(errno.ENOSPC, os.strerror(errno.ENOSPC))
entry.to_buffer(self._mem, offset)
def _iter_entries(self):
for offset in range(0, len(self._mem), DirectoryEntry._FORMAT.size):
entry = DirectoryEntry.from_buffer(self._mem, offset)
if entry.attr == 0x0F:
entry = LongFilenameEntry.from_buffer(self._mem, offset)
yield offset, entry
[docs]
class FatSubDirectory(FatDirectory):
"""
A concrete derivative of :class:`FatDirectory` representing a sub-directory
in a FAT file-system (of any type). Must be constructed with *fs* (a
:class:`FatFileSystem` instance), *start* (the first cluster of the
sub-directory), and *encoding*, which is taken from
:attr:`FatFileSystem.sfn_encoding`.
"""
__slots__ = ('_cs', '_file', 'fat_type')
def __init__(self, fs, start, encoding):
self._encoding = encoding
self._cs = fs.clusters.size
# NOTE: We always open sub-directories with a writable mode when
# possible; this simply parallels the state in FAT-12/16 root
# directories which are always writable (if the underlying mapping is)
self._file = fs.open_file(start, mode='rb' if fs.readonly else 'r+b')
self.fat_type = fs.fat_type
def _get_cluster(self):
return self._file._map[0]
def _update_entry(self, offset, entry):
pos = self._file.tell()
try:
self._file.seek(offset)
self._file.write(bytes(entry))
finally:
self._file.seek(pos)
def _iter_entries(self):
buf = bytearray(self._cs)
buf_offset = offset = 0
self._file.seek(0)
while self._file.readinto(buf):
buf_offset += offset
offset = 0
for entry in DirectoryEntry.iter_over(buf):
if entry.attr == 0x0F:
entry = LongFilenameEntry.from_buffer(buf, offset)
yield buf_offset + offset, entry
offset += DirectoryEntry._FORMAT.size
[docs]
class Fat12Root(FatRoot):
"""
Concrete, trivial derivative of :class:`FatRoot` which simply declares the
root as belonging to a FAT-12 file-system.
.. autoattribute:: fat_type
"""
fat_type = 'fat12'
[docs]
class Fat16Root(FatRoot):
"""
Concrete, trivial derivative of :class:`FatRoot` which simply declares the
root as belonging to a FAT-16 file-system.
.. autoattribute:: fat_type
"""
fat_type = 'fat16'
[docs]
class Fat32Root(FatSubDirectory):
"""
This is a trivial derivative of :class:`FatSubDirectory` because, in
FAT-32, the root directory is represented by the same structure as a
regular sub-directory.
"""
[docs]
class FatFile(io.RawIOBase):
"""
Represents an open file from a :class:`FatFileSystem`.
You should never need to construct this instance directly. Instead it (or
wrapped variants of it) is returned by the
:meth:`~nobodd.path.FatPath.open` method of :class:`~nobodd.path.FatPath`
instances. For example::
from nobodd.disk import DiskImage
from nobodd.fs import FatFileSystem
with DiskImage('test.img') as img:
with FatFileSystem(img.partitions[1].data) as fs:
path = fs.root / 'bar.txt'
with path.open('r', encoding='utf-8') as f:
print(f.read())
Instances can (and should) be used as context managers to implicitly close
references upon exiting the context. Instances are readable and seekable,
and writable, depending on their opening mode and the nature of the
underlying :class:`FatFileSystem`.
As a derivative of :class:`io.RawIOBase`, all the usual I/O methods should
be available.
"""
__slots__ = ('_fs', '_map', '_index', '_entry', '_pos', '_mode')
def __init__(self, fs, start, mode='rb', index=None, entry=None):
super().__init__()
if 'b' not in mode:
raise ValueError(lang._(
'non-binary mode {mode!r} not supported'.format(mode=mode)))
self._fs = weakref.ref(fs)
if start:
self._map = list(fs.fat.chain(start))
else:
self._map = []
self._index = index
self._entry = entry
self._pos = 0
if 'w' in mode:
self._mode = '+' if '+' in mode else 'w'
self.truncate()
elif 'a' in mode:
self._mode = '+' if '+' in mode else 'w'
self.seek(0, os.SEEK_END)
else:
self._mode = '+' if '+' in mode else 'r'
[docs]
@classmethod
def from_cluster(cls, fs, start, mode='rb'):
"""
Construct a :class:`FatFile` from a :class:`FatFileSystem`, *fs*, and
a *start* cluster. The optional *mode* is equivalent to the built-in
:func:`open` function.
Files constructed via this method do not have an associated directory
entry. As a result, their size is assumed to be the full size of their
cluster chain. This is typically used for the "file" backing a
:class:`FatSubDirectory`.
.. warning::
This method is intended for internal use by the
:class:`~nobodd.path.FatPath` class.
"""
return cls(fs, start, mode)
[docs]
@classmethod
def from_entry(cls, fs, index, entry, mode='rb'):
"""
Construct a :class:`FatFile` from a :class:`FatFileSystem`, *fs*, a
:class:`FatDirectory`, *index*, and a
:class:`~nobodd.fat.DirectoryEntry`, *entry*. The optional *mode* is
equivalent to the built-in :func:`open` function.
Files constructed via this method have an associated directory entry
which will be updated if/when reads or writes occur (updating atime,
mtime, and size fields).
.. warning::
This method is intended for internal use by the
:class:`~nobodd.path.FatPath` class.
"""
return cls(fs, get_cluster(entry, fs.fat_type), mode, index, entry)
def _get_fs(self):
"""
Check the weak reference to the FatFileSystem. If it's still live,
return the strong reference result. If it's disappeared, raise an
:exc:`OSError` exception indicating the file-system has been closed.
"""
fs = self._fs()
if fs is None:
raise ValueError(lang._(
'FatFileSystem containing {self!r} is closed'
.format(self=self)))
return fs
def _get_size(self):
"""
Returns the current size of the file. If the file has an associated
directory entry, we simply return the size recorded there. Otherwise,
the size is full size of all clusters in the file's chain.
"""
fs = self._get_fs()
if self._entry is None:
return fs.clusters.size * len(self._map)
else:
return self._entry.size
def _get_key(self):
"""
Returns the short filename (SFN) key for the associated directory
entry. This is used by various internal methods to locate the entry
for updating in the associated directory index.
"""
if self._entry is None:
raise ValueError(lang._('no key for entry-less FatFile'))
fs = self._get_fs()
filename = self._entry.filename.rstrip(b' ')
assert filename != b'\0' * 8
ext = self._entry.ext.rstrip(b' ')
assert ext != b'\0' * 3
return (
filename + b'.' + ext if ext else filename
).decode(fs.sfn_encoding)
def _set_size(self, new_size):
"""
Update the size of the file in the associated directory entry, if any.
If the file has no associated directory entry, this is a no-op.
"""
if self._entry is not None:
try:
first_cluster = self._map[0]
except IndexError:
# Only set first_cluster to 0 if the map is actually empty;
# we ignore size here because we allow size to be 0 with a
# cluster allocated while the file is open so that the file
# doesn't "move cluster" while it's opened, even if it's
# truncated. Only on close() do we remove the last cluster
first_cluster = 0
entry = self._entry._replace(
size=new_size,
first_cluster_hi=first_cluster >> 16,
first_cluster_lo=first_cluster & 0xFFFF)
self._index[self._get_key()] = entry
self._entry = entry
def _set_atime(self, ts=None):
"""
Update the access timestamp of the file in the associated directory
entry, if any, to the :class:`~datetime.datetime` *ts*. If the file has
no associated directory entry, this is a no-op.
"""
if self._entry is not None:
if ts is None:
ts = dt.datetime.now()
adate, _, _ = encode_timestamp(ts)
# This slightly convoluted logic is because assigning to _index
# causes writes to the underlying media and can fail for a variety
# of reasons (including no more space in the dir). Hence, don't
# re-write self._entry until it's actually written to disk.
entry = self._entry._replace(adate=adate)
self._index[self._get_key()] = entry
self._entry = entry
def _set_mtime(self, ts=None):
"""
Update the last-modified timestamp of the file in the associated
directory entry, if any, to the :class:`~datetime.datetime` *ts*. If
the file has no associated directory entry, this is a no-op.
"""
if self._entry is not None:
if ts is None:
ts = dt.datetime.now()
mdate, mtime, _ = encode_timestamp(ts)
# See note above
entry = self._entry._replace(mdate=mdate, mtime=mtime)
self._index[self._get_key()] = entry
self._entry = entry
def _check_closed(self):
if self.closed:
raise ValueError(lang._('I/O operation on closed file'))
[docs]
def close(self):
if not self.closed:
if self._entry is not None and self._entry.size == 0 and self._map:
# See note in _set_size
assert len(self._map) == 1
fs = self._get_fs()
fs.fat.mark_free(self._map[0])
self._map = []
self._set_size(0)
super().close()
[docs]
def readable(self):
return self._mode in 'r+'
[docs]
def seekable(self):
return True
[docs]
def writable(self):
return self._mode in 'w+'
[docs]
def readall(self):
self._check_closed()
if not self.readable():
raise io.UnsupportedOperation()
size = self._get_size()
buf = bytearray(max(0, size - self._pos))
mem = memoryview(buf)
pos = 0
while self._pos < size:
pos += self.readinto(mem[pos:])
return bytes(buf)
def readinto(self, buf):
self._check_closed()
if not self.readable():
raise io.UnsupportedOperation()
fs = self._get_fs()
cs = fs.clusters.size
size = self._get_size()
# index is which cluster of the file we wish to read; i.e. index 0
# represents the first cluster of the file; left and right are the byte
# offsets within the cluster to return; read is the number of bytes to
# return
index = self._pos // cs
left = self._pos - (index * cs)
right = min(cs, left + len(buf), size - (index * cs))
read = max(right - left, 0)
if read > 0:
buf[:read] = fs.clusters[self._map[index]][left:right]
self._pos += read
if fs.atime and not fs.readonly:
self._set_atime()
return read
def write(self, buf):
self._check_closed()
if not self.writable():
raise io.UnsupportedOperation()
mem = memoryview(buf)
fs = self._get_fs()
size = self._get_size()
if self._pos > size:
# Pad the file to the current position. Note that this does *not*
# count towards written
self.truncate()
written = 0
try:
while mem:
# Alternate between filling a cluster with _write1, and
# allocating a new cluster. This is far from the most efficient
# method (we're not taking account of whether any clusters are
# actually contiguous), but it is simple!
w = self._write1(mem, fs)
if w:
written += w
mem = mem[w:]
else:
# TODO In event of ENOSPC, raise or return written so far?
for cluster in fs.fat.free():
fs.fat.mark_end(cluster)
if self._map:
fs.fat[self._map[-1]] = cluster
self._map.append(cluster)
break
finally:
if self._pos > size:
self._set_size(self._pos)
self._set_mtime()
return written
def _write1(self, buf, fs=None):
"""
Write as much of *buf* to the file at the current position as will fit
in the current cluster, returning the number of bytes written, and
advancing the position of the file-pointer. If the current position is
beyond the end of the file, this method writes nothing and return 0.
"""
self._check_closed()
if fs is None:
fs = self._get_fs()
mem = memoryview(buf)
cs = fs.clusters.size
index = self._pos // cs
left = self._pos - (index * cs)
right = min(cs, left + len(mem))
written = max(right - left, 0)
if written > 0:
try:
fs.clusters[self._map[index]][left:right] = mem[:written]
except IndexError:
return 0
self._pos += written
return written
[docs]
def seek(self, pos, whence=io.SEEK_SET):
self._check_closed()
if whence == io.SEEK_SET:
pos = pos
elif whence == io.SEEK_CUR:
pos = self._pos + pos
elif whence == io.SEEK_END:
pos = self._get_size() + pos
else:
raise ValueError(lang._(
'invalid whence: {whence}'.format(whence=whence)))
if pos < 0:
raise OSError(errno.EINVAL, os.strerror(errno.EINVAL))
self._pos = pos
return self._pos
[docs]
def truncate(self, size=None):
self._check_closed()
if not self.writable():
raise io.UnsupportedOperation()
fs = self._get_fs()
cs = fs.clusters.size
old_size = self._get_size()
if size is None:
size = self._pos
if size == old_size:
return size
clusters = max(1, (size + cs - 1) // cs)
if size > old_size:
# If we're expanding the size of the file, zero the tail of the
# current final cluster; this is necessary whether or not we're
# expanding the actual number of clusters in the file. Note we
# don't bother calculating exactly how many bytes to zero; we just
# zero everything from the current size up to the end of the
# cluster because that's fine in either case
tail = len(self._map) * cs - old_size
if tail:
fs.clusters[self._map[-1]][-tail:] = b'\0' * tail
if clusters > len(self._map):
# Pre-calculate the clusters we're going to append. We don't want
# to add any if we can't add them all. We then mark the clusters
# in the FAT in reverse order, zeroing new blocks as we go so that
# the final extension of the file is effectively atomic (from a
# concurrent reader's perspective)
to_append = list(islice(fs.fat.free(), clusters - len(self._map)))
fs.fat.mark_end(to_append[-1])
zeros = b'\0' * cs
for next_c, this_c in pairwise(reversed([self._map[-1]] + to_append)):
fs.clusters[next_c] = zeros
fs.fat[this_c] = next_c
self._map.extend(to_append)
elif clusters < len(self._map):
# We start by marking the new end cluster, which atomically
# shortens the FAT chain for the file, then proceed to mark all the
# removed clusters as free
to_remove = self._map[len(self._map) - clusters:]
fs.fat.mark_end(self._map[clusters - 1])
del self._map[clusters:]
for cluster in to_remove:
fs.fat.mark_free(cluster)
# Finally, correct the directory entry to reflect the new size
self._set_size(size)
self._set_mtime()
return size