Source code for facedancer.endpoint

#
# This file is part of Facedancer
#
""" Functionality for describing USB endpoints. """

# Support annotations on Python < 3.9
from __future__  import annotations

import struct
import textwrap

from typing      import Iterable, List, Dict
from dataclasses import field
from collections import defaultdict

from .magic      import AutoInstantiable, instantiate_subordinates
from .descriptor import USBDescribable, USBDescriptor
from .request    import USBRequestHandler, get_request_handler_methods
from .request    import to_this_endpoint, standard_request_handler
from .types      import USBDirection, USBTransferType, USBSynchronizationType
from .types      import USBUsageType, USBStandardRequests

from .logging    import log


[docs] class USBEndpoint(USBDescribable, AutoInstantiable, USBRequestHandler): """ Class representing a USBEndpoint object. Field: number: The endpoint number (without the direction bit) for this endpoint. direction: A USBDirection constant indicating this endpoint's direction. transfer_type: A USBTransferType constant indicating the type of communications used. max_packet_size: The maximum packet size for this endpoint. interval: The polling interval, for an INTERRUPT endpoint. """ DESCRIPTOR_TYPE_NUMBER = 0x05 # Core identifiers. number : int direction : USBDirection # Endpoint attributes. transfer_type : USBTransferType = USBTransferType.BULK synchronization_type : USBSynchronizationType = USBSynchronizationType.NONE usage_type : USBUsageType = USBUsageType.DATA max_packet_size : int = 64 interval : int = 0 # Extra bytes that extend the basic endpoint descriptor. extra_bytes : bytes = b'' # Descriptors that will be included in a GET_CONFIGURATION response. attached_descriptors : List[USBDescriptor] = field(default_factory=list) # Descriptors that can be requested with the GET_DESCRIPTOR request. requestable_descriptors : Dict[tuple[int, int], USBDescriptor] = field(default_factory=dict) parent : USBDescribable = None
[docs] @classmethod def from_binary_descriptor(cls, data, strings={}): """ Creates an endpoint object from a description of that endpoint. """ # Parse the core descriptor into its components... address, attributes, max_packet_size, interval = struct.unpack_from("xxBBHB", data) # ... and break down the packed fields. number = address & 0x7F direction = address >> 7 transfer_type = attributes & 0b11 sync_type = attributes >> 2 & 0b1111 usage_type = attributes >> 4 & 0b11 return cls( number=number, direction=USBDirection(direction), transfer_type=USBTransferType(transfer_type), synchronization_type=USBSynchronizationType(sync_type), usage_type=USBUsageType(usage_type), max_packet_size=max_packet_size, interval=interval, extra_bytes=data[7:] )
def __post_init__(self): # Capture any descriptors declared directly on the class. for descriptor in instantiate_subordinates(self, USBDescriptor): self.add_descriptor(descriptor) # Grab our request handlers. self._request_handler_methods = get_request_handler_methods(self) # # User interface. #
[docs] @staticmethod def address_for_number(endpoint_number: int, direction: USBDirection) -> int: """ Computes the endpoint address for a given number + direction. """ direction_mask = 0x80 if direction == USBDirection.IN else 0x00 return endpoint_number | direction_mask
[docs] def get_device(self): """ Returns the device associated with the given descriptor. """ return self.parent.get_device()
[docs] def send(self, data: bytes, *, blocking: bool = False): """ Sends data on this endpoint. Valid only for IN endpoints. Args: data : The data to be sent. blocking : True if we should block until the backend reports the transmission to be complete. """ self.get_device()._send_in_packets(self.number, data, packet_size=self.max_packet_size, blocking=blocking)
# # Event handlers. #
[docs] def handle_data_received(self, data: bytes): """ Handler for receipt of non-control request data. Args: data : The raw bytes received. """ log.info(f"EP{self.number} received {len(data)} bytes of data; " "but has no handler.")
[docs] def handle_data_requested(self): """ Handler called when the host requests data on this endpoint."""
[docs] def handle_buffer_empty(self): """ Handler called when this endpoint first has an empty buffer. """
@standard_request_handler(number=USBStandardRequests.CLEAR_FEATURE) @to_this_endpoint def handle_clear_feature_request(self, request): log.debug(f"received CLEAR_FEATURE request for endpoint {self.number} " f"with value {request.value}") request.acknowledge() # # Properties. # @property def address(self): """ Fetches the address for the given endpoint. """ return self.address_for_number(self.number, self.direction)
[docs] def get_address(self): """ Method alias for the address property. For backend support. """ return self.address
@property def attributes(self): """ Fetches the attributes for the given endpoint, as a single byte. """ return (self.transfer_type & 0x03) | \ ((self.synchronization_type & 0x03) << 2) | \ ((self.usage_type & 0x03) << 4)
[docs] def add_descriptor(self, descriptor: USBDescriptor): """ Adds the provided descriptor to the endpoint. """ identifier = descriptor.get_identifier() desc_name = type(descriptor).__name__ if descriptor.include_in_config: self.attached_descriptors.append(descriptor) descriptor.parent = self elif descriptor.number is None: raise Exception( f"Descriptor of type {desc_name} cannot be added to this " f"endpoint because it is not to be included in the " f"configuration descriptor, yet does not have a number " f"to request it separately with") elif identifier in self.requestable_descriptors: other = self.requestable_descriptors[identifier] other_name = type(other).__name__ other_type = f"0x{other.type_number:02X}" raise Exception( f"Descriptor of type {desc_name} cannot be added to this " f"endpoint because there is already a descriptor of type " f"{other_name} with the same type code {other_type} and " f"number {other.number}") else: self.requestable_descriptors[identifier] = descriptor descriptor.parent = self
[docs] def get_descriptor(self) -> bytes: """ Get a descriptor string for this endpoint. """ # FIXME: use construct d = bytearray([ # length of descriptor in bytes 7 + len(self.extra_bytes), # descriptor type 5 == endpoint 5, self.address, self.attributes, self.max_packet_size & 0xff, (self.max_packet_size >> 8) & 0xff, self.interval ]) return d + self.extra_bytes
# # Automatic instantiation helpers. #
[docs] def get_identifier(self) -> int: return self.address
[docs] def matches_identifier(self, other:int) -> bool: # Use only the MSB and the lower nibble; per the USB specification. masked_other = other & 0b10001111 return self.get_identifier() == masked_other
# # Request handling. # def _request_handlers(self) -> Iterable[callable]: return self._request_handler_methods # # Pretty-printing. # def __str__(self): direction = USBDirection(self.direction).name transfer_type = USBTransferType(self.transfer_type).name is_interrupt = (self.transfer_type == USBTransferType.INTERRUPT) additional = f" every {self.interval}ms" if is_interrupt else "" return f"endpoint {self.number:02x}/{direction}: {transfer_type} transfers{additional}"
[docs] def generate_code(self, name=None, indent=0): if name is None: name = f"Endpoint_{self.number}_{self.direction.name}" direction = f"USBDirection.{self.direction.name}" transfer_type = f"USBTransferType.{self.transfer_type.name}" sync_type = f"USBSynchronizationType.{self.synchronization_type.name}" usage_type = f"USBUsageType.{self.usage_type.name}" values = str.join(", ", map(lambda x: f"0x{x:02x}", self.extra_bytes)) code = f""" class {name}(USBEndpoint): number = {self.number} direction = {direction} transfer_type = {transfer_type} synchronization_type = {sync_type} usage_type = {usage_type} max_packet_size = {self.max_packet_size} interval = {self.interval} extra_bytes = bytes([{values}]) """ # Use alphabetic suffixes to distinguish between multiple attached # descriptors with the same type number. suffixes = defaultdict(lambda: 'A') for descriptor in self.attached_descriptors: type_number = descriptor.type_number suffix = suffixes[type_number] suffixes[type_number] = chr(ord(suffix) + 1) name = f"Descriptor_0x{type_number:02X}_{suffix}" code += descriptor.generate_code(name=name, indent=4) for descriptor_id in sorted(self.requestable_descriptors): descriptor = self.requestable_descriptors[descriptor_id] code += descriptor.generate_code(indent=4) return textwrap.indent(code, indent * ' ')