Source code for facedancer.devices.ftdi

# pylint: disable=unused-wildcard-import, wildcard-import
#
# This file is part of Facedancer.
#
""" Emulation of an FTDI USB-to-serial converter. """

import asyncio
import struct

from enum   import IntFlag
from typing import Union

from .         import default_main
from ..        import *
from ..classes import USBDeviceClass

from ..logging import log

OUT_ENDPOINT = 2
IN_ENDPOINT  = 1


[docs] class FTDIFlowControl(IntFlag): """ Constants describing how FTDI flow control works. """ NO_FLOW_CONTROL = 0 RTS_CTS = 1 DTR_DSR = 2 XON_XOFF = 4
[docs] @use_inner_classes_automatically class FTDIDevice(USBDevice): """ Class implementing an emulated FTDI device. """ vendor_id : int = 0x0403 product_id : int = 0x6001 device_revision : int = 0x0600 serial_number : str = "FT123450" manufacturer_string : StringRef = StringRef.field(string="not-FTDI") product_string : StringRef = StringRef.field(string="FTDI emulation") serial_number_string : StringRef = StringRef.field(string=serial_number) eeprom_data = [ 0x0440, 0x0304, 0x0160, 0x0006, 0x802D, 0x0800, 0x0002, 0x1812, 0x2A20, 0x4812, 0x0000, 0x0000, 0x1203, 0x6E00, 0x6F00, 0x7400, 0x2D00, 0x4600, 0x5400, 0x4400, 0x4900, 0x1E03, 0x4600, 0x5400, 0x4400, 0x4900, 0x2000, 0x6500, 0x6D00, 0x7500, 0x6C00, 0x6100, 0x7400, 0x6900, 0x6F00, 0x6E00, 0x1203, 0x4600, 0x5400, 0x3100, 0x3200, 0x3300, 0x3400, 0x3500, 0x3000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x0000, 0x6B67 ] class _Configuration(USBConfiguration): configuration_string : str = "FTDI config" class _Interface(USBInterface): # This is a completely vendor-specific device. class_number : int = USBDeviceClass.VENDOR_SPECIFIC subclass_number : int = USBDeviceClass.VENDOR_SPECIFIC protocol_number : int = USBDeviceClass.VENDOR_SPECIFIC class _OutEndpoint(USBEndpoint): number : int = OUT_ENDPOINT direction : USBDirection = USBDirection.OUT transfer_type : USBTransferType = USBTransferType.BULK class _InEndpoint(USBEndpoint): number : int = IN_ENDPOINT direction : USBDirection = USBDirection.IN transfer_type : USBTransferType = USBTransferType.BULK def __post_init__(self): super().__post_init__() # FTDI Windows driver reads serial number descriptor after reading the EEPROM for verification self.strings.add_string(self.serial_number, index=3) self.reset_ftdi()
[docs] def reset_ftdi(self): """ Resets the FTDI driver back to its original state. """ # Create a fake baud rate. self.baud_rate = 9600 # Start off with DTR/RTS disabled. self.use_dtr = False self.use_rts = False # Create synthetic values for our control signals. self.clear_to_send = True self.data_set_ready = True self.ring_detect = False self.line_status_detect = True self.data_terminal_ready = False self.ready_to_send = False # Start off with no flow control. self.flow_control = FTDIFlowControl.NO_FLOW_CONTROL
# # Request handlers. # @vendor_request_handler(number=0) def handle_reset_request(self, request): log.debug("Received FTDI reset; assuming initial settings.") self.reset_ftdi() request.acknowledge() @vendor_request_handler(number=1) def handle_modem_ctrl_request(self, req): log.debug("received modem_ctrl request") dtr = bool(req.value & 0x0001) rts = bool(req.value & 0x0002) self.use_dtr = bool(req.value & 0x0100) self.use_rts = bool(req.value & 0x0200) if dtr: log.info("DTR set -- host appears to have connected via virtual serial.") else: log.info("DTR cleared -- host appears to have disconnected from virtual serial.") if self.use_dtr: self.data_terminal_ready = dtr if self.use_rts: self.ready_to_send = rts req.acknowledge() @vendor_request_handler(number=2) def handle_set_flow_ctrl_request(self, request): """ Control request to set up flow control. """ try: self.flow_control = FTDIFlowControl(request.value) if self.flow_control: log.info(f"Host has set up {self.flow_control.name} flow control.") else: log.info(f"Host has disabled flow control.") request.acknowledge() except KeyError: request.stall() @vendor_request_handler(number=3) def handle_set_baud_rate_request(self, request): """ Control request to set our baud rate. """ if request.value > 9: log.warning("Host specified an unknown baud rate value.") request.acknowledge() return # For most values, the FTDI device uses the value to set the baud divisor, # such that 0 = 300, 1 = 600, etc. if request.value < 8: self.baud_rate = 300 * (2 ** request.value) # For values of 8/9, it jumps up to hit two more standard bauds. elif request.value == 8: self.baud_rate = 57600 elif request.value == 9: self.baud_rate = 115200 log.info(f"Host set baud rate to {self.baud_rate}.") request.acknowledge() @vendor_request_handler(number=4) def handle_set_data_request(self, request): log.debug("received set_data request") request.acknowledge() @vendor_request_handler(number=5) def handle_get_modem_status_request(self, request): """ Handles requests for the FTDI device's modem status. """ # Currently, we're emulating the original FTDI SIO, so we only provide # a single byte of status. Otherwise, we'd have an second byte with line status. response = \ (1 << 4) if self.clear_to_send else 0 | \ (1 << 5) if self.data_set_ready else 0 | \ (1 << 6) if self.ring_detect else 0 | \ (1 << 7) if self.line_status_detect else 0 request.reply((response,)) @vendor_request_handler(number=6) def handle_set_event_char_request(self, request): log.debug("received set_event_char request") request.acknowledge() @vendor_request_handler(number=7) def handle_set_error_char_request(self, request): log.debug("received set_error_char request") request.acknowledge() @vendor_request_handler(number=9) def handle_set_latency_timer_request(self, request): log.debug("received set_latency_timer request") request.acknowledge() @vendor_request_handler(number=10) def handle_get_latency_timer_request(self, request): log.debug("received get_latency_timer request") # Per Travis Goodspeed, this is a "bullshit value". request.reply(b'\x01') @vendor_request_handler(number=144) def handle_read_eeprom_request(self, request): log.info(f"received read_eeprom request at index {request.index}") if 0 <= request.index < len(self.eeprom_data): data_word = self.eeprom_data[request.index] response_bytes = struct.pack('>H', data_word) request.reply(response_bytes) log.debug(f"Handled EEPROM read at index {request.index}: sent {response_bytes.hex()}") elif request.index == 66: request.reply(struct.pack('>H', 0x0000)) log.debug(f"Handled EEPROM read at index {request.index}: sent 0000") elif request.index < 128: request.reply(struct.pack('>H', 0xFFFF)) log.debug(f"Handled EEPROM read at index {request.index}: sent FFFF") else: request.stall() log.info(f"EEPROM read at out-of-bounds index {request.index}: stalled") # # Internal event handlers. #
[docs] def handle_data_received(self, endpoint, data): """ Called back whenever data is received. """ log.debug(f"received {len(data)} bytes on {endpoint}") self.handle_serial_data_received(data)
# # User I/O interface. #
[docs] async def wait_for_host(self): """ Waits until the host connects by waiting for DTR assertion. """ # Wait for the host to assert DTR. while not self.data_terminal_ready: await asyncio.sleep(0.1)
[docs] def handle_serial_data_received(self, data): """ Callback executed when serial data is received. Subclasses should override this to capture data from the host. """ log.debug(f"Received serial data: {data}")
[docs] def transmit(self, data: Union[str, bytes], *, blocking: bool = False, adjust_endings: bool = True): """ Transmits a block of data over the provided FTDI link to the host. Parameters: data -- The data to be sent. blocking -- If true, this method will wait for completion before returning. adjust_endings -- If true, line endings will be adjusted before sending. """ FTDI_PAYLOAD_LENGTH = 62 # If this isn't a set of raw bytes, encode it into bytes. if hasattr(data, 'encode'): if adjust_endings: data = data.replace("\n", "\r\n") data = data.encode('utf-8') # Packetize and send the relevant data. data = bytearray(data) while data: packet = data[0:FTDI_PAYLOAD_LENGTH] del data[0:FTDI_PAYLOAD_LENGTH] self._transmit_packet(packet, blocking=blocking)
def _transmit_packet(self, data: bytes, *, blocking: bool = False): """ Sends a single packet of up to 63 data bytes over our link. """ # Generate an FTDI packet. packet = bytearray() # Our first/header byte contains the payload length in bits [7:2], and a packet ID of 01 in [1:0]. packet.append((len(data) << 2) | 0b01) packet.append(0) # The remainder of the packet is our payload. packet.extend(data) self.send(IN_ENDPOINT, packet, blocking=blocking)
if __name__ == "__main__": default_main(FTDIDevice)