Source code for facedancer.interface

#
# This file is part of facedancer.
#
""" Functionality for defining USB interfaces. """

# Support annotations on Python < 3.9
from __future__  import annotations

import struct
import textwrap

from typing       import Dict, List, Iterable
from dataclasses  import field
from collections  import defaultdict

from .magic       import instantiate_subordinates, AutoInstantiable
from .types       import USBDirection, USBStandardRequests

from .            import device
from .descriptor  import USBDescribable, USBDescriptor, USBClassDescriptor, USBDescriptorTypeNumber, StringRef
from .request     import USBControlRequest, USBRequestHandler, get_request_handler_methods
from .request     import standard_request_handler, to_this_interface
from .endpoint    import USBEndpoint

from .logging     import log


[docs] class USBInterface(USBDescribable, AutoInstantiable, USBRequestHandler): """ Class representing a USBDevice interface. Fields: number : The interface's index. Zero indexed. class_number, subclass_number, protocol_number : The USB class adhered to on this interface; usually a USBDeviceClass constant. interface_string : A short, descriptive string used to identify the endpoint; or None if not provided. """ DESCRIPTOR_TYPE_NUMBER = 0x4 name : StringRef = StringRef.field(string="generic USB interface") number : int = 0 alternate : int = 0 class_number : int = 0 subclass_number : int = 0 protocol_number : int = 0 interface_string : str = None # Descriptors that will be included in a GET_CONFIGURATION response. attached_descriptors : List[USBDescriptor] = field(default_factory=list) # Descriptors that can be requested with the GET_DESCRIPTOR request. requestable_descriptors : Dict[tuple[int, int], USBDescriptor] = field(default_factory=dict) endpoints : Dict[int, USBEndpoint] = field(default_factory=dict) parent : USBDescribable = None
[docs] @classmethod def from_binary_descriptor(cls, data, strings={}): """ Generates an interface object from a descriptor. """ interface_number, alternate_setting, num_endpoints, interface_class, \ interface_subclass, interface_protocol, string_index \ = struct.unpack_from("xxBBBBBBB", data) return cls( name=None, number=interface_number, alternate=alternate_setting, class_number=interface_class, subclass_number=interface_subclass, protocol_number=interface_protocol, interface_string=StringRef.lookup(strings, string_index) )
def __post_init__(self): self.interface_string = StringRef.ensure(self.interface_string) # Capture any descriptors/endpoints declared directly on the class. for endpoint in instantiate_subordinates(self, USBEndpoint): self.add_endpoint(endpoint) for descriptor in instantiate_subordinates(self, USBDescriptor): self.add_descriptor(descriptor) # Populate our request handlers. self._request_handler_methods = get_request_handler_methods(self) # # User interface. #
[docs] def get_device(self): """ Returns the device associated with the given descriptor. """ return self.parent.get_device()
[docs] def add_endpoint(self, endpoint: USBEndpoint): """ Adds the provided endpoint to the interface. """ if endpoint.address in self.endpoints: ep_name = type(endpoint).__name__ ep_addr = f"0x{endpoint.address:02X}" other = self.endpoints[endpoint.address] other_name = type(other).__name__ raise Exception( f"Endpoint of type {ep_name} cannot be added to this " f"interface because there is already an endpoint of " f"type {other_name} with the same address {ep_addr}") else: self.endpoints[endpoint.address] = endpoint endpoint.parent = self
[docs] def get_endpoint(self, endpoint_number: int, direction: USBDirection) -> USBEndpoint: """ Attempts to find a subordinate endpoint matching the given number/direction. Args: endpoint_number : The endpoint number to search for. direction : The endpoint direction to be matched. Returns: The matching endpoint; or None if no matching endpoint existed. """ address = USBEndpoint.address_for_number(endpoint_number, direction) return self.endpoints.get(address, None)
[docs] def has_endpoint(self, endpoint_number: int, direction: USBDirection) -> USBEndpoint: """ Returns true iff we have matching subordinate endpoint. Args: endpoint_number : The endpoint number to search for. direction : The endpoint direction to be matched. """ return (self.get_endpoint(endpoint_number, direction) is not None)
[docs] def add_descriptor(self, descriptor: USBDescriptor): """ Adds the provided descriptor to the interface. """ identifier = descriptor.get_identifier() desc_name = type(descriptor).__name__ if descriptor.include_in_config: self.attached_descriptors.append(descriptor) descriptor.parent = self elif descriptor.number is None: raise Exception( f"Descriptor of type {desc_name} cannot be added to this " f"interface because it is not to be included in the " f"configuration descriptor, yet does not have a number " f"to request it separately with") elif identifier in self.requestable_descriptors: other = self.requestable_descriptors[identifier] other_name = type(other).__name__ other_type = f"0x{other.type_number:02X}" raise Exception( f"Descriptor of type {desc_name} cannot be added to this " f"interface because there is already a descriptor of type " f"{other_name} with the same type code {other_type} and " f"number {other.number}") else: self.requestable_descriptors[identifier] = descriptor descriptor.parent = self
# # Event handlers. #
[docs] def handle_data_received(self, endpoint: USBEndpoint, data: bytes): """ Handler for receipt of non-control request data. Typically, this method will delegate any data received to the appropriate configuration/interface/endpoint. If overridden, the overriding function will receive all data; and can delegate it by calling the `.handle_data_received` method on `self.configuration`. Args: endpoint_number : The endpoint number on which the data was received. data : The raw bytes received on the relevant endpoint. """ if self.has_endpoint(endpoint.number, endpoint.direction): endpoint.handle_data_received(data) else: self.get_device().handle_unexpected_data_received(endpoint.number, data)
[docs] def handle_data_requested(self, endpoint: USBEndpoint): """ Handler called when the host requests data on a non-control endpoint. Typically, this method will delegate the request to the appropriate interface+endpoint. If overridden, the overriding function will receive all data. Args: endpoint_number : The endpoint number on which the host requested data. """ if self.has_endpoint(endpoint.number, endpoint.direction): endpoint.handle_data_requested() else: self.get_device().handle_unexpected_data_requested(endpoint.number)
[docs] def handle_buffer_empty(self, endpoint: USBEndpoint): """ Handler called when a given endpoint first has an empty buffer. Often, an empty buffer indicates an opportunity to queue data for sending ('prime an endpoint'), but doesn't necessarily mean that the host is planning on reading the data. This function is called only once per buffer. """ if self.has_endpoint(endpoint.number, endpoint.direction): endpoint.handle_buffer_empty()
# # Backend helpers. #
[docs] def get_endpoints(self): """ Returns an iterable over all endpoints in this interface. """ return self.endpoints.values()
# # Internal interface. # @standard_request_handler(number=USBStandardRequests.GET_DESCRIPTOR) @to_this_interface def handle_get_descriptor_request(self, request): """ Handle GET_DESCRIPTOR requests; per USB2 [9.4.3] """ log.debug("Handling GET_DESCRIPTOR on endpoint.") # This is the same as the USBDevice get descriptor request => avoid duplication. self.get_device().handle_generic_get_descriptor_request(self, request) # Table 9-12 of USB 2.0 spec (pdf page 296)
[docs] def get_descriptor(self) -> bytes: """ Retrieves the given interface's interface descriptor, with subordinates. """ # FIXME: use construct string_manager = self.get_device().strings d = bytearray([ 9, # length of descriptor in bytes 4, # descriptor type 4 == interface self.number, self.alternate, len(self.endpoints), self.class_number, self.subclass_number, self.protocol_number, string_manager.get_index(self.interface_string) ]) for descriptor in self.attached_descriptors: if callable(descriptor): d += descriptor() else: d += descriptor # ... append each endpoint's endpoint descriptor. for e in self.endpoints.values(): d += e.get_descriptor() for descriptor in e.attached_descriptors: if callable(descriptor): d += descriptor() else: d += descriptor return d
# # Alternate interface support. # @standard_request_handler(number=USBStandardRequests.SET_INTERFACE) @to_this_interface def handle_set_interface_request(self, request: USBControlRequest): """ Handle SET_INTERFACE requests; per USB2 [9.4.10] """ log.debug(f"f{self.name} received SET_INTERFACE request") configuration = self.parent device = configuration.parent backend = device.backend if device.configuration is None: request.stall() else: try: # Find this alternate setting and switch to it. number = request.index_low alternate = request.value identifier = (number, alternate) interface = configuration.interfaces[identifier] configuration.active_interfaces[number] = interface # Reset the data toggles of this interface's endpoints. for endpoint in interface.endpoints.values(): backend.clear_halt(endpoint.number, endpoint.direction) request.acknowledge() except KeyError: request.stall() @standard_request_handler(number=USBStandardRequests.GET_INTERFACE) @to_this_interface def handle_get_interface_request(self, request): """ Handle GET_INTERFACE requests; per USB2 [9.4.4] """ log.debug("received GET_INTERFACE request") configuration = self.parent device = configuration.parent if device.configuration is None: request.stall() else: try: number = request.index_low interface = configuration.active_interfaces[number] request.reply(bytes([interface.alternate])) except KeyError: request.stall() # # Automatic instantiation support. #
[docs] def get_identifier(self) -> (int, int): return (self.number, self.alternate)
# Although we identify interfaces by (number, alternate), this helper # is called from the request handling code, where we only want to # match by interface number. The correct alternate interface should have # been selected earlier in the request handling process.
[docs] def matches_identifier(self, other: int) -> bool: return (other == self.number)
# # Request handler functions. # def _request_handlers(self) -> Iterable[callable]: return self._request_handler_methods def _get_subordinate_handlers(self) -> Iterable[callable]: return self.endpoints.values()
[docs] def generate_code(self, name=None, indent=0): if name is None: if self.alternate == 0: name = f"Interface_{self.number}" else: name = f"Interface_{self.number}_{self.alternate}" code = f""" class {name}(USBInterface): number = {self.number} alternate = {self.alternate} class_number = {self.class_number} subclass_number = {self.subclass_number} protocol_number = {self.protocol_number} interface_string = {self.interface_string.generate_code()} """ # Use alphabetic suffixes to distinguish between multiple attached # descriptors with the same type number. suffixes = defaultdict(lambda: 'A') for descriptor in self.attached_descriptors: type_number = descriptor.type_number suffix = suffixes[type_number] suffixes[type_number] = chr(ord(suffix) + 1) name = f"Descriptor_0x{type_number:02X}_{suffix}" code += descriptor.generate_code(name=name, indent=4) for endpoint in self.endpoints.values(): code += endpoint.generate_code(indent=4) for descriptor_id in sorted(self.requestable_descriptors): descriptor = self.requestable_descriptors[descriptor_id] code += descriptor.generate_code(indent=4) return textwrap.indent(code, indent * ' ')