# MoondancerApp.py
import sys
import time
import codecs
import enum
import traceback
from typing import List, Tuple
from ..core import *
from ..device import USBDevice
from ..configuration import USBConfiguration
from ..request import USBControlRequest
from ..types import DeviceSpeed, USBDirection
from ..logging import log
from .base import FacedancerBackend
# Quirk flags
[docs]
class QuirkFlag(enum.IntFlag):
MANUAL_SET_ADDRESS: int = 0x01
# Cynthion interrupt events
[docs]
class InterruptEvent:
USB_BUS_RESET: int = 10
USB_RECEIVE_CONTROL: int = 11
USB_RECEIVE_PACKET: int = 12
USB_SEND_COMPLETE: int = 13
[docs]
def __init__(self, data: Tuple[int, int]):
"""
Parses a tuple of two bytes representing an interrupt event into an InterruptEvent.
Args:
data : A tuple of two bytes. The first byte is the interrupt code, the second is the endpoint number.
"""
if len(data) != 2:
log.error(f"Invalid length for InterruptEvent: {len(data)}")
raise ValueError(f"Invalid length for InterruptEvent: {len(data)}")
event = data[0]
endpoint_number = data[1]
if event not in [
InterruptEvent.USB_BUS_RESET,
InterruptEvent.USB_RECEIVE_CONTROL,
InterruptEvent.USB_RECEIVE_PACKET,
InterruptEvent.USB_SEND_COMPLETE
]: raise ValueError(f"Unknown InterruptEvent id: {event}")
self.event = event
self.endpoint_number = endpoint_number
def __eq__(self, rhs):
return self.event == rhs
def __repr__(self):
name = "UNKNOWN"
if self.event == InterruptEvent.USB_BUS_RESET:
name = "USB_BUS_RESET"
elif self.event == InterruptEvent.USB_RECEIVE_CONTROL:
name = "USB_RECEIVE_CONTROL"
elif self.event == InterruptEvent.USB_RECEIVE_PACKET:
name = "USB_RECEIVE_PACKET"
elif self.event == InterruptEvent.USB_SEND_COMPLETE:
name = "USB_SEND_COMPLETE"
return f"{name} {self.endpoint_number}"
#
# Moondancer backend implementation
#
[docs]
class MoondancerApp(FacedancerApp, FacedancerBackend):
"""
Backend for using Cynthion devices as Facedancers.
"""
app_name = "Moondancer"
# Number of supported USB endpoints.
SUPPORTED_ENDPOINTS = 16
[docs]
def __init__(self, device: USBDevice=None, verbose: int=0, quirks: List[str]=[]):
"""
Sets up a new Cynthion-backed Facedancer (Moondancer) application.
Args:
device : The Cynthion device that will act as our Moondancer.
verbose : The verbosity level of the given application.
"""
log.info("Using the Moondancer backend.")
import cynthion
if device is None:
device = cynthion.Cynthion()
self.device = device
self.device.comms.get_exclusive_access()
FacedancerApp.__init__(self, device, verbose)
self.connected_device = None
# Grab the raw API object from the Cynthion object.
# This has the low-level RPCs used for raw USB control.
self.api = self.device.apis.moondancer
# Initialize a dictionary that will store the last setup
# whether each endpoint is currently stalled.
self.endpoint_stalled = {}
for i in range(self.SUPPORTED_ENDPOINTS):
self.endpoint_stalled[i] = False
# Assume a max packet size of 64 until configured otherwise.
self.max_packet_size_ep0 = 64
# Start off by assuming we're not waiting for an OUT control transfer's
# data stage. # See handle_setup_complete_on_endpoint for details.
self.pending_control_request = None
# Store a reference to the device's active configuration,
# which we'll use to know which endpoints we'll need to check
# for data transfer readiness.
self.configuration = None
#
# Store our list of quirks to handle.
#
if quirks:
self.quirks = quirks
else:
self.quirks = []
# Maintain a list of configured endpoints with form: (address, max_packet_size, USBTransferType)
self.configured_endpoints = dict()
# - Facedancer backend methods --------------------------------------------
[docs]
@classmethod
def appropriate_for_environment(cls, backend_name: str) -> bool:
"""
Determines if the current environment seems appropriate
for using the Moondancer backend.
"""
# Check: if we have a backend name other than moondancer,
# the user is trying to use something else. Abort!
if backend_name and backend_name != "cynthion":
return False
# If we're not explicitly trying to use something else,
# see if there's a connected Cynthion.
try:
import cynthion
device = cynthion.Cynthion()
return device.supports_api('moondancer')
except ImportError:
log.info("Skipping Cynthion-based devices, as the cynthion python module isn't installed.")
return False
except IOError:
log.warning("Found Cynthion-based device, but could not access it. (Check permissions?)")
return False
except:
return False
[docs]
def get_version(self):
"""
Returns information about the active Moondancer version.
"""
# TODO: Return the Cynthion software version, or something indicating
# the Cynthion API number?
raise NotImplementedError()
[docs]
def connect(self, usb_device: USBDevice, max_packet_size_ep0: int=64, device_speed: DeviceSpeed=DeviceSpeed.FULL):
"""
Prepares Cynthion to connect to the target host and emulate
a given device.
Args:
usb_device : The USBDevice object that represents the device to be
emulated.
"""
if device_speed not in [DeviceSpeed.FULL, DeviceSpeed.HIGH]:
log.warning(f"Moondancer only supports USB Full and High Speed. Ignoring requested speed: {device_speed.name}")
log.debug(f"moondancer.connect(max_packet_size_ep0:{max_packet_size_ep0}, device_speed:{device_speed}, quirks:{self.quirks})")
self.max_packet_size_ep0 = max_packet_size_ep0
# compute our quirk flags
quirks = 0
if 'manual_set_address' in self.quirks:
log.warning("Handling SET_ADDRESS on the target host side!")
quirks |= QuirkFlag.MANUAL_SET_ADDRESS
# connect to target host
self.api.connect(self.max_packet_size_ep0, device_speed, quirks)
self.connected_device = usb_device
# get device name
device_name = f"{type(self.connected_device).__module__}.{type(self.connected_device).__qualname__}"
log.info(f"Connected {device_speed.name} speed device '{device_name}' to target host.")
[docs]
def disconnect(self):
""" Disconnects Cynthion from the target host. """
log.info("Disconnecting from target host.")
self.device.comms.release_exclusive_access()
# disconnect from target host
self.api.disconnect()
self.connected_device = None
[docs]
def reset(self):
"""
Triggers the Cynthion to handle its side of a bus reset.
"""
log.debug(f"moondancer.bus_reset()")
self.api.bus_reset()
[docs]
def set_address(self, address: int, defer: bool=False):
"""
Sets the device address of Moondancer. Usually only used during
initial configuration.
Args:
address : The address that Moondancer should assume.
defer : True iff the set_address request should wait for an active transaction to finish.
"""
log.debug(f"moondancer.set_address({address}, {defer})")
self.api.set_address(address, 1 if defer else 0)
[docs]
def read_from_endpoint(self, endpoint_number: int) -> bytes:
"""
Reads a block of data from the given endpoint.
Args:
endpoint_number : The number of the OUT endpoint on which data is to be rx'd.
"""
log.debug(f"moondancer.read_from_endpoint({endpoint_number})")
# Read from the given endpoint...
data = self.api.read_endpoint(endpoint_number)
# Re-enable OUT interface to receive data again...
self.api.ep_out_interface_enable()
log.trace(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)} '{data}'")
# Finally, return the result.
return data
[docs]
def send_on_control_endpoint(self, endpoint_number: int, in_request: USBControlRequest, data: bytes, blocking: bool=True):
"""
Sends a collection of USB data in response to a IN control request by the host.
Args:
endpoint_number : The number of the IN endpoint on which data should be sent.
requested_length : The number of bytes requested by the host.
data : The data to be sent.
blocking : If true, this function should wait for the transfer to complete.
"""
requested_length = in_request.length
self.api.write_control_endpoint(endpoint_number, requested_length, blocking, bytes(data))
log.debug(f"moondancer.send_on_control_endpoint({endpoint_number}, {requested_length}, {len(data)}, {blocking})")
log.trace(f" moondancer.api.write_control_endpoint({endpoint_number}, {requested_length}, {blocking}, {len(data)})")
[docs]
def send_on_endpoint(self, endpoint_number: int, data: bytes, blocking: bool=True):
"""
Sends a collection of USB data on a given endpoint.
Args:
endpoint_number : The number of the IN endpoint on which data should be sent.
data : The data to be sent.
blocking : If true, this function will wait for the transfer to complete.
"""
self.api.write_endpoint(endpoint_number, blocking, bytes(data))
log.debug(f"moondancer.send_on_endpoint({endpoint_number}, {len(data)}, {blocking})")
log.trace(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, {len(data)})")
# TODO this is only used by USBProxy - replace with "backend.ep_prime_for_receive" and "backend.send_zlp"
[docs]
def ack_status_stage(self, direction: USBDirection=USBDirection.OUT, endpoint_number:int =0, blocking: bool=False):
"""
Handles the status stage of a correctly completed control request,
by priming the appropriate endpoint to handle the status phase.
Args:
direction : Determines if we're ACK'ing an IN or OUT vendor request.
(This should match the direction of the DATA stage.)
endpoint_number : The endpoint number on which the control request
occurred.
blocking : True if we should wait for the ACK to be fully issued
before returning.
"""
log.debug(f"moondancer.ack_status_stage({direction.name}, {endpoint_number}, {blocking})")
if direction == USBDirection.OUT: # HOST_TO_DEVICE
# If this was an OUT request, we'll prime the output buffer to
# respond with the ZLP expected during the status stage.
self.api.write_endpoint(endpoint_number, blocking, bytes([]))
log.trace(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, [])")
else: # DEVICE_TO_HOST (IN)
# If this was an IN request, we'll need to set up a transfer descriptor
# so the status phase can operate correctly. This effectively reads the
# zero length packet from the STATUS phase.
self.api.ep_out_prime_receive(endpoint_number)
log.trace(f" moondancer.api.ep_out_prime_receive({endpoint_number})")
[docs]
def stall_endpoint(self, endpoint_number:int, direction: USBDirection=USBDirection.OUT):
"""
Stalls the provided endpoint, as defined in the USB spec.
Args:
endpoint_number : The number of the endpoint to be stalled.
"""
endpoint_address = (endpoint_number | 0x80) if direction else endpoint_number
log.debug(f"Stalling EP{endpoint_number} {USBDirection(direction).name} (0x{endpoint_address:x})")
# Mark endpoint number as stalled.
self.endpoint_stalled[endpoint_number] = True
# Stall endpoint address.
if direction:
self.api.stall_endpoint_in(endpoint_number)
log.debug(f" moondancer.api.stall_endpoint_in({endpoint_number})")
else:
self.api.stall_endpoint_out(endpoint_number)
log.debug(f" moondancer.api.stall_endpoint_out({endpoint_number})")
[docs]
def clear_halt(self, endpoint_number: int, direction: USBDirection):
""" Clears a halt condition on the provided non-control endpoint.
Args:
endpoint_number : The endpoint number
direction : The endpoint direction; or OUT if not provided.
"""
endpoint_address = (endpoint_number | 0x80) if direction else endpoint_number
log.debug(f"Clearing halt EP{endpoint_number} {USBDirection(direction).name} (0x{endpoint_address:x})")
self.api.clear_feature_endpoint_halt(endpoint_number, direction)
log.debug(f" moondancer.api.clear_feature_endpoint_halt({endpoint_number}, {direction})")
[docs]
def service_irqs(self):
"""
Core routine of the Facedancer execution/event loop. Continuously monitors the
Moondancer's execution status, and reacts as events occur.
"""
# Get latest interrupt events
events: List[Tuple[int, int]] = self.api.get_interrupt_events()
# Handle interrupt events.
if len(events) > 0:
# gcp doesn't seem to return a nested tuple if it's only one event
if isinstance(events[0], int):
events = [ events ]
parsed_events = [InterruptEvent(event) for event in events]
for event in parsed_events:
log.debug(f"MD IRQ => {event}")
if event == InterruptEvent.USB_BUS_RESET:
self.handle_bus_reset()
elif event == InterruptEvent.USB_RECEIVE_CONTROL:
self.handle_receive_control(event.endpoint_number)
elif event == InterruptEvent.USB_RECEIVE_PACKET and event.endpoint_number == 0:
# TODO support endpoints other than EP0
self.handle_receive_control_packet(event.endpoint_number)
elif event == InterruptEvent.USB_RECEIVE_PACKET:
self.handle_receive_packet(event.endpoint_number)
elif event == InterruptEvent.USB_SEND_COMPLETE:
self.handle_send_complete(event.endpoint_number)
else:
log.error(f"Unhandled interrupt event: {event}")
# Check EP_IN NAK status for pending data requests
else:
nak_status = self.api.get_nak_status()
if nak_status != 0:
self.handle_ep_in_nak_status(nak_status)
# - Interrupt event handlers ----------------------------------------------
# USB0_BUS_RESET
[docs]
def handle_bus_reset(self):
"""
Triggers Moondancer to perform its side of a bus reset.
"""
if self.connected_device:
self.connected_device.handle_bus_reset()
else:
self.api.bus_reset()
# USB0_RECEIVE_CONTROL
[docs]
def handle_receive_control(self, endpoint_number: int):
"""
Handles a known outstanding control event on a given endpoint.
endpoint_number: The endpoint number for which a control event should be serviced.
"""
log.debug(f"handle_receive_control({endpoint_number})")
# HACK: to maintain API compatibility with the existing facedancer API,
# we need to know if a stall happens at any point during our handler.
self.endpoint_stalled[endpoint_number] = False
# Read the data from the SETUP stage...
data = bytearray(self.api.read_control())
request = self.connected_device.create_request(data)
log.debug(f" moondancer.api.read_control({endpoint_number}) -> {len(data)} '{request}'")
is_out = request.get_direction() == USBDirection.OUT # HOST_TO_DEVICE
has_data = (request.length > 0)
log.trace(f" is_out:{is_out} has_data:{has_data}")
# Special case: if this is an OUT request with a data stage, we won't
# handle the request until the data stage has been completed. Instead,
# we'll stash away the data received in the setup stage, prime the
# endpoint for the data stage, and then wait for the data stage to
# complete, triggering a corresponding code path in
# in handle_transfer_complete_on_endpoint.
if is_out and has_data:
log.debug(f" setup packet has data - queueing read")
self.pending_control_request = request
self.api.ep_out_prime_receive(endpoint_number)
return
# Pass the request to the emulated device for handling.
log.trace(f" connected_device.handle_request({request})")
self.connected_device.handle_request(request)
# If it was an IN request with a data stage we now need to
# prime the endpoint to receive a ZLP from the host
# acknowledging receipt of our response.
if has_data and not is_out and not self.endpoint_stalled[endpoint_number]:
log.debug(f" CONTROL IN -> prime ep to receive zlp")
self.api.ep_out_prime_receive(endpoint_number)
# USB0_RECEIVE_PACKET(0)
[docs]
def handle_receive_control_packet(self, endpoint_number: int):
log.debug(f"moondancer.handle_receive_control_packet({endpoint_number}) pending:{self.pending_control_request}")
# Handle packet if we don't have a pending control request
if not self.pending_control_request:
data = self.api.read_endpoint(endpoint_number)
if len(data) == 0:
# It's a zlp following an IN control transfer, re-enable interface for reception on other endpoints.
self.api.ep_out_interface_enable()
else:
log.error(f"Discarding {len(data)} bytes on control endpoint with no pending control request")
return
# We have a pending control request with a data stage...
# Read the rest of the data from the endpoint, completing the control request.
new_data = self.api.read_endpoint(endpoint_number)
log.debug(f" handling control data stage: {len(new_data)} bytes")
log.trace(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(new_data)}")
if len(new_data) == 0:
# It's a zlp following a control IN transfer, re-enable interface for reception on other endpoints.
self.api.ep_out_interface_enable()
log.debug(f"ZLP ending Control IN transfer on ep: {endpoint_number}")
return
# Append our new data to the pending control request.
self.pending_control_request.data.extend(new_data)
all_data_received = len(self.pending_control_request.data) == self.pending_control_request.length
is_short_packet = len(new_data) < self.max_packet_size_ep0
if all_data_received or is_short_packet:
# Handle the completed setup request...
self.connected_device.handle_request(self.pending_control_request)
# And clear our pending setup data.
self.pending_control_request = None
# Finally, re-enable interface for reception on other endpoints.
self.api.ep_out_interface_enable()
return
# Finally, re-prime our control endpoint to receive the rest of the control data.
self.api.ep_out_prime_receive(endpoint_number)
# USB0_RECEIVE_PACKET(1...15)
[docs]
def handle_receive_packet(self, endpoint_number: int):
"""
Handles a known-completed transfer on a given endpoint.
Args:
endpoint_number : The endpoint number for which the transfer should be serviced.
"""
log.debug(f"moondancer.handle_receive_packet({endpoint_number})")
# Read the data from the endpoint
data = self.api.read_endpoint(endpoint_number)
log.trace(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)}")
# Ignore it if it's a ZLP ack as Facedancer devices don't handle it.
if len(data) == 0:
# Finally, Prime endpoint to receive again.
self.api.ep_out_interface_enable()
log.debug(f" ZLP ending Bulk IN transfer on ep: {endpoint_number}")
return
# Pass it to the device's handler
self.connected_device.handle_data_available(endpoint_number, data)
# Finally, re-enable other OUT endpoints so we can receive on them again.
self.api.ep_out_interface_enable()
# USB0_SEND_COMPLETE
[docs]
def handle_send_complete(self, endpoint_number: int):
log.debug(f"handle_send_complete({endpoint_number})")
pass
# Handle pending data requests on EP_IN
[docs]
def handle_ep_in_nak_status(self, nak_status: int):
nakked_endpoints = [epno for epno in range(self.SUPPORTED_ENDPOINTS) if (nak_status >> epno) & 1]
for endpoint_number in nakked_endpoints:
if endpoint_number != 0:
log.trace(f"Received IN NAK on ep{endpoint_number}")
self.connected_device.handle_nak(endpoint_number)