Source code for nobodd.tftp

# nobodd: a boot configuration tool for the Raspberry Pi
#
# Copyright (c) 2023-2024 Dave Jones <dave.jones@canonical.com>
# Copyright (c) 2023-2024 Canonical Ltd.
#
# SPDX-License-Identifier: GPL-3.0

import re
import struct
from enum import IntEnum, auto

from . import lang
from .tools import labels, formats, FrozenDict


# The following references were essential in constructing this module; the
# original TFTP version 2 [RFC1350], the TFTP option extension [RFC2347], and
# the wikipedia page documenting the protocol [1].
#
# [1]: https://en.wikipedia.org/wiki/Trivial_File_Transfer_Protocol
# [RFC1350]: https://datatracker.ietf.org/doc/html/rfc1350
# [RFC2347]: https://datatracker.ietf.org/doc/html/rfc2347


TFTP_BLKSIZE = 'blksize'
TFTP_MIN_BLKSIZE = 8
TFTP_DEF_BLKSIZE = 512
TFTP_MAX_BLKSIZE = 65464

TFTP_TIMEOUT = 'timeout'
TFTP_UTIMEOUT = 'utimeout'
TFTP_MIN_TIMEOUT_NS = 10_000_000 # 10ms
TFTP_MAX_TIMEOUT_NS = 255_000_000_000 # 255s
TFTP_DEF_TIMEOUT_NS = 1_000_000_000 # 1s

TFTP_BINARY = 'octet'
TFTP_NETASCII = 'netascii'
TFTP_MODES = frozenset({TFTP_BINARY, TFTP_NETASCII})

TFTP_TSIZE = 'tsize'
TFTP_OPTIONS = frozenset({TFTP_TSIZE, TFTP_BLKSIZE, TFTP_TIMEOUT,
                          TFTP_UTIMEOUT})


[docs] class OpCode(IntEnum): """ Enumeration of op-codes for the `Trivial File Transfer Protocol`_ (TFTP). These appear at the start of any TFTP packet to indicate what sort of packet it is. .. _Trivial File Transfer Protocol: https://en.wikipedia.org/wiki/Trivial_File_Transfer_Protocol """ RRQ = 1 WRQ = auto() DATA = auto() ACK = auto() ERROR = auto() OACK = auto()
[docs] class Error(IntEnum): """ Enumeration of error status for the `Trivial File Transfer Protocol`_ (TFTP). These are used in packets with :class:`OpCode` ``ERROR`` to indicate the sort of error that has occurred. .. _Trivial File Transfer Protocol: https://en.wikipedia.org/wiki/Trivial_File_Transfer_Protocol """ UNDEFINED = 0 NOT_FOUND = auto() NOT_AUTH = auto() DISK_FULL = auto() BAD_OP = auto() UNKNOWN_ID = auto() EXISTS = auto() UNKNOWN_USER = auto() INVALID_OPT = auto()
[docs] class Packet: """ Abstract base class for all TFTP packets. This provides the class method :meth:`Packet.from_bytes` which constructs and returns the appropriate concrete sub-class for the :class:`OpCode` found at the beginning of the packet's data. Instances of the concrete classes may be converted back to :class:`bytes` simply by calling :class:`bytes` on them:: >>> b = b'\\x00\\x01config.txt\\0octet\\0' >>> r = Packet.from_bytes(b) >>> r RRQPacket(filename='config.txt', mode='octet', options=FrozenDict({})) >>> bytes(r) b'\\x00\\x01config.txt\\x00octet\\x00' Concrete classes can also be constructed directly, for conversion into :class:`bytes` during transfer:: >>> bytes(ACKPacket(block=10)) b'\\x00\\x04\\x00\\n' >>> bytes(RRQPacket('foo', 'netascii', {'tsize': 0})) b'\\x00\\x01foo.txt\\x00netascii\\x00tsize\\x000\\x00' """ __slots__ = () opcode = None def __repr__(self): fields = ', '.join( f'{field}={getattr(self, field)!r}' for field in self.__class__.__slots__) return f'{self.__class__.__name__}({fields})'
[docs] @classmethod def from_bytes(cls, s): """ Given a :class:`bytes`-string *s*, checks the :class:`OpCode` at the front, and constructs one of the concrete packet types defined below, returning (instead of :class:`Packet` which is abstract):: >>> Packet.from_bytes(b'\\x00\\x01config.txt\\0octet\\0') RRQPacket(filename='config.txt', mode='octet', options=FrozenDict({})) """ opcode, = struct.unpack_from('!H', s) try: cls = { OpCode.RRQ: RRQPacket, OpCode.WRQ: WRQPacket, OpCode.DATA: DATAPacket, OpCode.ACK: ACKPacket, OpCode.ERROR: ERRORPacket, OpCode.OACK: OACKPacket, }[opcode] except KeyError: raise ValueError(lang._( 'invalid packet opcode {opcode}'.format(opcode=opcode))) else: return cls.from_data(s[2:])
[docs] @classmethod def from_data(cls, data): """ Constructs an instance of the packet class with the specified *data* (which is everything in the :class:`bytes`-string passed to :meth:`from_bytes` minus the header). This method is not implemented in :class:`Packet` but is expected to be implemented in any concrete descendant. """ raise NotImplementedError()
[docs] class RRQPacket(Packet): """ Concrete type for ``RRQ`` (read request) packets. These packets are sent by a client to initiate a transfer. They include the *filename* to be sent, the *mode* to send it (one of the strings "octet" or "netascii"), and any *options* the client wishes to negotiate. """ __slots__ = ('filename', 'mode', 'options') opcode = OpCode.RRQ options_re = re.compile( rb'(?P<name>[\x20-\xFF]+)\0(?P<value>[\x01-\xFF]*)\0') packet_re = re.compile( rb'^' rb'(?P<filename>[\x20-\xFF]+)\0' rb'(?P<mode>[a-zA-Z]+)\0' rb'(?P<options>(?:[\x20-\xFF]+\0[\x01-\xFF]*\0)*)' rb'.*') def __init__(self, filename, mode, options=None): self.filename = str(filename) self.mode = str(mode).lower() if options is None: options = () self.options = FrozenDict(options) def __bytes__(self): return b''.join(( struct.pack('!H', self.opcode), self.filename.encode('ascii'), b'\0', self.mode.encode('ascii'), b'\0', b''.join(tuple( s for name, value in self.options.items() for s in ( name.encode('ascii'), b'\0', str(value).encode('ascii'), b'\0', ) )), ))
[docs] @classmethod def from_data(cls, data): try: filename, mode, suffix = cls.packet_re.match(data).groups() except AttributeError: raise ValueError(lang._('badly formed RRQ/WRQ packet')) # Technically the filename must be in ASCII format (7-bit chars in an # 8-bit field), but given ASCII is a strict subset of UTF-8, and that # UTF-8 cannot include NUL chars, I see no harm in permitting UTF-8 # encoded filenames filename = filename.decode('utf-8') mode = mode.decode('ascii').lower() if mode not in TFTP_MODES: raise ValueError(lang._('unsupported file mode')) options = { match.group('name').decode('ascii').lower(): match.group('value').decode('ascii').lower() for match in cls.options_re.finditer(suffix) } return cls(filename, mode, options)
[docs] class WRQPacket(RRQPacket): """ Concrete type for ``WRQ`` (write request) packets. These packets are sent by a client to initiate a transfer to the server. They include the *filename* to be sent, the *mode* to send it (one of the strings "octet" or "netascii"), and any *options* the client wishes to negotiate. """ __slots__ = () opcode = OpCode.WRQ
[docs] class DATAPacket(Packet): """ Concrete type for ``DATA`` packets. These are sent in response to ``RRQ``, ``WRQ``, or ``ACK`` packets and each contains a block of the file to transfer, *data* (by default, 512 bytes long unless this is the final ``DATA`` packet), and the *block* number. """ __slots__ = ('block', 'data') opcode = OpCode.DATA def __init__(self, block, data): self.block = int(block) if not 1 <= self.block <= 65535: raise ValueError(f'invalid block (1..65535): {block}') self.data = bytes(data) def __bytes__(self): return struct.pack( f'!HH{len(self.data)}s', self.opcode, self.block, self.data)
[docs] @classmethod def from_data(cls, data): block, = struct.unpack_from('!H', data) return cls(block, data[2:])
[docs] class ACKPacket(Packet): """ Concrete type for ``ACK`` packets. These are sent in response to ``DATA`` packets, and acknowledge the successful receipt of the specified *block*. """ __slots__ = ('block',) opcode = OpCode.ACK def __init__(self, block): self.block = int(block) if not 0 <= self.block <= 65535: raise ValueError(f'invalid block (0..65535): {block}') def __bytes__(self): return struct.pack(f'!HH', self.opcode, self.block)
[docs] @classmethod def from_data(cls, data): block, = struct.unpack_from('!H', data) return cls(block)
[docs] class ERRORPacket(Packet): """ Concrete type for ``ERROR`` packets. These are sent by either end of a transfer to indicate a fatal error condition. Receipt of an ``ERROR`` packet immediately terminates a transfer without further acknowledgment. The ``ERROR`` packet contains the *error* code (an :class:`Error` value) and a descriptive *message*. """ __slots__ = ('error', 'message') opcode = OpCode.ERROR def __init__(self, error, message=None): self.error = Error(int(error)) if message is None: self.message = { # NOTE: These messages are deliberately *not* marked for # translation as they are sent to the client Error.UNDEFINED: 'Undefined error', Error.NOT_FOUND: 'File not found', Error.NOT_AUTH: 'Access violation', Error.DISK_FULL: 'Disk full or allocation exceeded', Error.BAD_OP: 'Illegal TFTP operation', Error.UNKNOWN_ID: 'Unknown transfer ID', Error.EXISTS: 'File already exists', Error.UNKNOWN_USER: 'No such user', }[self.error] else: self.message = str(message) def __bytes__(self): return struct.pack( f'!HH{len(self.message)}sx', self.opcode, self.error, self.message.encode('ascii'))
[docs] @classmethod def from_data(cls, data): error, = struct.unpack_from('!H', data) return cls(error, data[2:].rstrip(b'\0').decode('ascii', 'replace'))
[docs] class OACKPacket(Packet): """ Concrete type for ``OACK`` packets. This is sent by the server instead of an initial ``DATA`` packet, when the client includes options in the ``RRQ`` packet. The content of the packet is all the *options* the server accepts, and their (potentially revised) values. """ __slots__ = ('options',) opcode = OpCode.OACK options_re = RRQPacket.options_re def __init__(self, options): self.options = FrozenDict(options) def __bytes__(self): return struct.pack('!H', self.opcode) + b''.join(tuple( s for name, value in self.options.items() for s in ( name.encode('ascii'), b'\0', str(value).encode('ascii'), b'\0', ) ))
[docs] @classmethod def from_data(cls, data): options = { match.group('name').decode('ascii').lower(): match.group('value').decode('ascii').lower() for match in cls.options_re.finditer(data) } return cls(options)