# GreatDancerApp.py
import sys
import time
import codecs
import traceback
from ..core import *
from ..types import *
from ..logging import log
from .base import FacedancerBackend
[docs]
class GreatDancerApp(FacedancerApp, FacedancerBackend):
"""
Backend for using GreatFET devices as Facedancers.
"""
app_name = "GreatDancer"
app_num = 0x00 # This doesn't have any meaning for us.
# Interrupt register (USBSTS) bits masks.
USBSTS_D_UI = (1 << 0)
USBSTS_D_URI = (1 << 6)
USBSTS_D_NAKI = (1 << 16)
# Number of supported USB endpoints.
# TODO: bump this up when we develop support using USB0 (cables flipped)
SUPPORTED_ENDPOINTS = 4
# USB directions
HOST_TO_DEVICE = 0
DEVICE_TO_HOST = 1
# Get status command indexes
GET_USBSTS = 0
GET_ENDPTSETUPSTAT = 1
GET_ENDPTCOMPLETE = 2
GET_ENDPTSTATUS = 3
GET_ENDPTNAK = 4
# Quirk flags
QUIRK_MANUAL_SET_ADDRESS = 0x01
[docs]
@classmethod
def appropriate_for_environment(cls, backend_name):
"""
Determines if the current environment seems appropriate
for using the GreatDancer backend.
"""
# Check: if we have a backend name other than greatfet,
# the user is trying to use something else. Abort!
if backend_name and backend_name != "greatfet":
return False
# If we're not explicitly trying to use something else,
# see if there's a connected GreatFET.
try:
import greatfet
gf = greatfet.GreatFET()
return gf.supports_api('greatdancer')
except ImportError:
log.info("Skipping GreatFET-based devices, as the greatfet python module isn't installed.")
return False
except:
return False
[docs]
def __init__(self, device=None, verbose=0, quirks=None):
"""
Sets up a new GreatFET-backed Facedancer (GreatDancer) application.
device: The GreatFET device that will act as our GreatDancer.
verbose: The verbosity level of the given application.
"""
import greatfet
if device is None:
device = greatfet.GreatFET()
self.device = device
self.device.comms.get_exclusive_access()
FacedancerApp.__init__(self, device, verbose)
self.connected_device = None
# Grab the raw API object from the GreatFET object.
# This has the low-level RPCs used for raw USB control.
self.api = self.device.apis.greatdancer
# Initialize a dictionary that will store the last setup
# whether each endpoint is currently stalled.
self.endpoint_stalled = {}
for i in range(self.SUPPORTED_ENDPOINTS):
self.endpoint_stalled[i] = False
# Assume a max packet size of 64 until configured otherwise.
self.max_packet_size_ep0 = 64
# Start off by assuming we're not waiting for an OUT control transfer's
# data stage. # See _handle_setup_complete_on_endpoint for details.
self.pending_control_request = None
# Store a reference to the device's active configuration,
# which we'll use to know which endpoints we'll need to check
# for data transfer readiness.
self.configuration = None
#
# Store our list of quirks to handle.
#
if quirks:
self.quirks = quirks
else:
self.quirks = []
[docs]
def init_commands(self):
"""
API compatibility function; not necessary for GreatDancer.
"""
pass
[docs]
def get_version(self):
"""
Returns information about the active GreatDancer version.
"""
# TODO: Return the GreatFET software version, or something indicating
# the GreatFET API number?
raise NotImplementedError()
[docs]
def ack_status_stage(self, direction=HOST_TO_DEVICE, endpoint_number=0, blocking=False):
"""
Handles the status stage of a correctly completed control request,
by priming the appropriate endpoint to handle the status phase.
Args:
direction : Determines if we're ACK'ing an IN or OUT vendor request.
(This should match the direction of the DATA stage.)
endpoint_number : The endpoint number on which the control request
occurred.
blocking : True if we should wait for the ACK to be fully issued
before returning.
"""
if direction == self.HOST_TO_DEVICE:
# If this was an OUT request, we'll prime the output buffer to
# respond with the ZLP expected during the status stage.
self.send_on_endpoint(endpoint_number, data=[], blocking=blocking)
else:
# If this was an IN request, we'll need to set up a transfer descriptor
# so the status phase can operate correctly. This effectively reads the
# zero length packet from the STATUS phase.
self.read_from_endpoint(endpoint_number)
def _generate_endpoint_config_arguments(self, config):
"""
Generates the data content for an Endpoint Configuration command that will
set up the GreatDancer's endpoints to match the active configuration.
Args:
config : A USBConfiguration object that represents the configuration being
applied to the GreatDancer.
"""
arguments = []
# If our configuration is None, there's nothing to configure; bail out.
if config is None:
return arguments
for interface in config.get_interfaces():
for endpoint in interface.get_endpoints():
log.info(f"Configuring {endpoint}.")
triple = (endpoint.get_address(), endpoint.max_packet_size, endpoint.transfer_type,)
arguments.append(triple)
return arguments
[docs]
def connect(self, usb_device, max_packet_size_ep0=64, device_speed=DeviceSpeed.FULL):
"""
Prepares the GreatDancer to connect to the target host and emulate
a given device.
Args:
usb_device : The USBDevice object that represents the device to be
emulated.
"""
if device_speed != DeviceSpeed.FULL:
log.warning(f"GreatFET only supports USB Full Speed. Ignoring requested speed: {device_speed.name}")
self.max_packet_size_ep0 = max_packet_size_ep0
quirks = 0
# Compute our quirk flags.
if 'manual_set_address' in self.quirks:
log.info("Handling SET_ADDRESS on the host side!")
quirks |= self.QUIRK_MANUAL_SET_ADDRESS
self.api.connect(self.max_packet_size_ep0, quirks)
self.connected_device = usb_device
log.info("Connecting to host.")
[docs]
def disconnect(self):
""" Disconnects the GreatDancer from its target host. """
log.info("Disconnecting from host.")
self.device.comms.release_exclusive_access()
self.api.disconnect()
def _wait_until_ready_to_send(self, ep_num):
# If we're already ready, we don't need to do anything. Abort.
if self._is_ready_for_priming(ep_num, self.DEVICE_TO_HOST):
return
# Otherwise, wait until we're ready to send...
while not self._is_ready_for_priming(ep_num, self.DEVICE_TO_HOST):
pass
# ... and since we've blocked the app from cleaning up any transfer
# descriptors automatically by spinning in this thread, we'll clean up
# the relevant transfers here.
self._clean_up_transfers_for_endpoint(ep_num, self.DEVICE_TO_HOST)
[docs]
def send_on_endpoint(self, ep_num, data, blocking=True):
"""
Sends a collection of USB data on a given endpoint.
Args:
ep_num : The number of the IN endpoint on which data should be sent.
data : The data to be sent.
blocking : If true, this function will wait for the transfer to complete.
"""
log.trace(f"EP{ep_num}/IN: <- {bytes(data)}")
self._wait_until_ready_to_send(ep_num)
self.api.send_on_endpoint(ep_num, bytes(data))
# If we're blocking, wait until the transfer completes.
if blocking:
while not self._transfer_is_complete(ep_num, self.DEVICE_TO_HOST):
pass
self._clean_up_transfers_for_endpoint(ep_num, self.DEVICE_TO_HOST)
[docs]
def read_from_endpoint(self, ep_num):
"""
Reads a block of data from the given endpoint.
Args:
ep_num : The number of the OUT endpoint on which data is to be rx'd.
"""
# Start a nonblocking read from the given endpoint...
self._prime_out_endpoint(ep_num)
# ... and wait for the transfer to complete.
while not self._transfer_is_complete(ep_num, self.HOST_TO_DEVICE):
pass
# Finally, return the result.
return self._finish_primed_read_on_endpoint(ep_num)
@staticmethod
def _endpoint_address(ep_num, direction):
"""
Returns the endpoint number that corresponds to a given direction
and address.
"""
if direction:
return ep_num | 0x80
else:
return ep_num
[docs]
def stall_endpoint(self, ep_num, direction=0):
"""
Stalls the provided endpoint, as defined in the USB spec.
Args:
ep_num : The number of the endpoint to be stalled.
"""
in_vs_out = "IN" if direction else "OUT"
log.trace("Stalling EP{} {}".format(ep_num, in_vs_out))
self.endpoint_stalled[ep_num] = True
self.api.stall_endpoint(self._endpoint_address(ep_num, direction))
[docs]
def stall_ep0(self, direction=0):
"""
Convenience function that stalls the control endpoint zero.
"""
self.stall_endpoint(0, direction)
[docs]
def set_address(self, address, defer=False):
"""
Sets the device address of the GreatDancer. Usually only used during
initial configuration.
Args:
address : The address that the GreatDancer should assume.
defer : True iff the set_address request should wait for an active
transaction to finish.
"""
self.api.set_address(address, 1 if defer else 0)
@staticmethod
def _decode_usb_register(transfer_result):
"""
Decodes a raw 32-bit register value from a form encoded
for transit as a USB control request.
Args:
transfer_result : The value returned by the vendor request.
Returns: The raw integer value of the given register.
"""
status_hex = codecs.encode(transfer_result[::-1], 'hex')
return int(status_hex, 16)
def _fetch_irq_status(self):
"""
Fetch the USB controller's pending-IRQ bitmask, which indicates
which interrupts need to be serviced.
Returns: A raw integer bitmap.
"""
return self.api.get_status(self.GET_USBSTS)
def _fetch_setup_status(self):
"""
Fetch the USB controller's "pending setup packet" bitmask, which
indicates which endpoints have setup packets to be read.
Returns: A raw integer bitmap.
"""
return self.api.get_status(self.GET_ENDPTSETUPSTAT)
def _handle_setup_events(self):
"""
Handles any outstanding setup events on the USB controller.
"""
# Determine if we have setup packets on any of our endpoints.
status = self._fetch_setup_status()
# If we don't, abort.
if not status:
return
# Otherwise, figure out which endpoints have outstanding setup events,
# and handle them.
for i in range(self.SUPPORTED_ENDPOINTS):
if status & (1 << i):
self._handle_setup_event_on_endpoint(i)
def _handle_setup_event_on_endpoint(self, endpoint_number):
"""
Handles a known outstanding setup event on a given endpoint.
Args:
endpoint_number : The endpoint number for which a setup event should be serviced.
"""
# HACK: to maintain API compatibility with the existing facedancer API,
# we need to know if a stall happens at any point during our handler.
self.endpoint_stalled[endpoint_number] = False
# Read the data from the SETUP stage...
data = bytearray(self.api.read_setup(endpoint_number))
request = self.connected_device.create_request(data)
# If this is an OUT request, handle the data stage,
# and add it to the request.
is_out = request.get_direction() == self.HOST_TO_DEVICE
has_data = (request.length > 0)
# Special case: if this is an OUT request with a data stage, we won't
# handle the request until the data stage has been completed. Instead,
# we'll stash away the data received in the setup stage, prime the
# endpoint for the data stage, and then wait for the data stage to
# complete, triggering a corresponding code path in
# in _handle_transfer_complete_on_endpoint.
if is_out and has_data:
self._prime_out_endpoint(endpoint_number)
self.pending_control_request = request
return
self.connected_device.handle_request(request)
if not is_out and not self.endpoint_stalled[endpoint_number]:
self.ack_status_stage(direction=self.DEVICE_TO_HOST)
def _fetch_transfer_status(self):
"""
Fetch the USB controller's "completed transfer" bitmask, which
indicates which endpoints have recently completed transactions.
Returns : A raw integer bitmap.
"""
return self.api.get_status(self.GET_ENDPTCOMPLETE)
def _transfer_is_complete(self, endpoint_number, direction):
"""
Returns true iff a given endpoint has just completed a transfer.
Can be used to check for completion of a non-blocking transfer.
Args:
endpoint_number : The endpoint number to be queried.
direction : The direction of the transfer. Should be self.HOST_TO_DEVICE or
self.DEVICE_TO_HOST.
"""
status = self._fetch_transfer_status()
# From the LPC43xx manual: out endpoint completions start at bit zero,
# while in endpoint completions start at bit 16.
out_is_ready = (status & (1 << endpoint_number))
in_is_ready = (status & (1 << (endpoint_number + 16)))
if direction == self.HOST_TO_DEVICE:
return out_is_ready
else:
return in_is_ready
def _handle_transfer_events(self):
"""
Handles any outstanding setup events on the USB controller.
"""
# Determine if we have ready packets on any of our endpoints.
status = self._fetch_transfer_status()
# If we don't, abort.
if not status:
return
# Figure out which endpoints have recently completed transfers,
# and clean up any transactions on those endpoints. It's important
# that this be done /before/ the _handle_transfer_complete... section
# below, as those can generate further events which will need the freed
# transfer descriptors.
# [Note that it's safe to clean up the transfer descriptors before reading,
# here-- the GreatFET's USB controller has transparently moved any data
# from OUT transactions into a holding buffer for us. Nice of it!]
for i in range(self.SUPPORTED_ENDPOINTS):
if status & (1 << i):
self._clean_up_transfers_for_endpoint(i, self.HOST_TO_DEVICE)
if status & (1 << (i + 16)):
self._clean_up_transfers_for_endpoint(i, self.DEVICE_TO_HOST)
# Now that we've cleaned up all relevant transfer descriptors, trigger
# any events that should occur due to the completed transaction.
for i in range(self.SUPPORTED_ENDPOINTS):
if status & (1 << i):
self._handle_transfer_complete_on_endpoint(i, self.HOST_TO_DEVICE)
if status & (1 << (i + 16)):
self._handle_transfer_complete_on_endpoint(i, self.DEVICE_TO_HOST)
# Finally, after completing all of the above, we may now have idle
# (unprimed) endpoints. For OUT endpoints, we'll need to re-prime them
# so we're ready for receipt; for IN endpoints, we'll want to give the
# emulated device a chance to provide new data.
self._handle_transfer_readiness()
def _finish_primed_read_on_endpoint(self, endpoint_number):
"""
Completes a non-blocking (primed) read on an OUT endpoint by reading any data
received since the endpoint was primed. See read_from_endpoint for an example
of proper use.
Args:
endpoint_number : The endpoint to read from.
"""
return self.api.finish_nonblocking_read(endpoint_number)
def _clean_up_transfers_for_endpoint(self, endpoint_number, direction):
"""
Cleans up any outstanding transfers on the given endpoint. This must be
called for each completed transaction so the relevant transfer descriptors
can be re-used.
There's no harm in calling this if a transaction isn't complete, but it _must_
be called at least once for each completed transaction.
Args:
endpoint_number : The endpoint number whose transfer descriptors should be cleaned
up.
direction : The endpoint direction for which TD's should be cleaned.
"""
# Ask the device to clean up any transaction descriptors related to the transfer.
self.api.clean_up_transfer(self._endpoint_address(endpoint_number, direction))
def _is_control_endpoint(self, endpoint_number):
"""
Returns true iff the given endpoint number corresponds to a control endpoint.
"""
# FIXME: Support control endpoints other than EP0.
return endpoint_number == 0
def _handle_transfer_complete_on_endpoint(self, endpoint_number, direction):
"""
Handles a known-completed transfer on a given endpoint.
Args:
endpoint_number : The endpoint number for which a setup event should be serviced.
"""
# If a transfer has just completed on an OUT endpoint, we've just received data
# that we need to handle.
if direction == self.HOST_TO_DEVICE:
# Special case: if we've just received data on a control endpoint,
# we're completing a control request.
if self._is_control_endpoint(endpoint_number):
# If we received a setup packet to handle, handle it.
if self.pending_control_request:
# Read the rest of the data from the endpoint, completing
# the control request.
new_data = self._finish_primed_read_on_endpoint(endpoint_number)
# Append our new data to the pending control request.
self.pending_control_request.data.extend(new_data)
all_data_received = len(self.pending_control_request.data) == self.pending_control_request.length
is_short_packet = len(new_data) < self.max_packet_size_ep0
if all_data_received or is_short_packet:
# Handle the completed setup request...
self.connected_device.handle_request(self.pending_control_request)
# And clear our pending setup data.
self.pending_control_request = None
else:
# Otherwise, re-prime the endpoint to grab the next packet.
self._prime_out_endpoint(endpoint_number)
# Typical case: this isn't a control endpoint, so we don't have a
# defined packet format. Read the data and issue the corresponding
# callback.
else:
data = self._finish_primed_read_on_endpoint(endpoint_number)
log.trace(f"EP{endpoint_number}/OUT -> {data}")
self.connected_device.handle_data_available(endpoint_number, data)
def _fetch_transfer_readiness(self):
"""
Queries the GreatFET for a bitmap describing the endpoints that are not
currently primed, and thus ready to be primed again.
"""
return self.api.get_status(self.GET_ENDPTSTATUS)
def _fetch_endpoint_nak_status(self):
"""
Queries the GreatFET for a bitmap describing the endpoints that have issued
a NAK since the last time this was checked.
"""
return self.api.get_status(self.GET_ENDPTNAK)
def _prime_out_endpoint(self, endpoint_number):
"""
Primes an out endpoint, allowing it to receive data the next time the host chooses to send it.
Args:
endpoint_number : The endpoint that should be primed.
"""
self.api.start_nonblocking_read(endpoint_number)
def _handle_transfer_readiness(self):
"""
Check to see if any non-control IN endpoints are ready to
accept data from our device, and handle if they are.
"""
# If we haven't been configured yet, we can't have any
# endpoints other than the control endpoint, and we don't n
if not self.configuration:
return
# Fetch the endpoint status.
status = self._fetch_transfer_readiness()
# Check the status of every endpoint /except/ endpoint zero,
# which is always a control endpoint and set handled by our
# control transfer handler.
for interface in self.configuration.get_interfaces():
for endpoint in interface.get_endpoints():
# Check to see if the endpoint is ready to be primed.
if self._is_ready_for_priming(endpoint.number, endpoint.direction):
# If this is an IN endpoint, we're ready to accept data to be
# presented on the next IN token.
if endpoint.direction == USBDirection.IN:
self.connected_device.handle_buffer_available(endpoint.number)
# If this is an OUT endpoint, we'll need to prime the endpoint to
# accept new data. This provides a place for data to go once the
# host sends an OUT token.
else:
self._prime_out_endpoint(endpoint.number)
def _is_ready_for_priming(self, ep_num, direction):
"""
Returns true iff the endpoint is ready to be primed.
Args:
ep_num : The endpoint number in question.
direction : The endpoint direction in question.
"""
# Fetch the endpoint status.
status = self._fetch_transfer_readiness()
ready_for_in = (not status & (1 << (ep_num + 16)))
ready_for_out = (not status & (1 << (ep_num)))
if direction == self.HOST_TO_DEVICE:
return ready_for_out
else:
return ready_for_in
@classmethod
def _has_issued_nak(cls, ep_nak, ep_num, direction):
"""
Interprets an ENDPTNAK status result to determine
whether a given endpoint has NAK'd.
Args:
ep_nak : The status work read from the ENDPTNAK register
ep_num : The endpoint number in question.
direction : The endpoint direction in question.
"""
in_nak = (ep_nak & (1 << (ep_num + 16)))
out_nak = (ep_nak & (1 << (ep_num)))
if direction == cls.HOST_TO_DEVICE:
return out_nak
else:
return in_nak
def _bus_reset(self):
"""
Triggers the GreatDancer to perform its side of a bus reset.
"""
log.debug("Host issued bus reset.")
if self.connected_device:
self.connected_device.handle_bus_reset()
else:
self.api.bus_reset()
[docs]
def reset(self):
"""
Triggers the GreatFET to handle its side of a bus reset.
"""
self.api.bus_reset()
def _handle_nak_events(self):
"""
Handles an event in which the GreatDancer has NAK'd an IN token.
"""
# If we haven't been configured yet, we can't have any
# endpoints other than the control endpoint, and we don't need to
# handle any NAKs.
if not self.configuration:
return
# Fetch the endpoint status.
status = self._fetch_endpoint_nak_status()
# Iterate over each usable endpoint.
for interface in self.configuration.get_interfaces():
for endpoint in interface.get_endpoints():
# Skip OUT endpoints
if endpoint.direction == self.HOST_TO_DEVICE:
continue
# If the endpoint has NAK'd, issued the relevant callback.
if self._has_issued_nak(status, endpoint.number, endpoint.direction):
self.connected_device.handle_nak(endpoint.number)
def _configure_endpoints(self, configuration):
"""
Configures the GreatDancer's endpoints to match the provided configuration.
Args:
configuration : The USBConfiguration object that describes the endpoints provided.
"""
endpoint_triplets = self._generate_endpoint_config_arguments(configuration)
# If we need to issue a configuration command, issue one.
# (If there are no endpoints other than control, this command will be
# empty, and we can skip this.)
if endpoint_triplets:
self.api.set_up_endpoints(*endpoint_triplets)
[docs]
def service_irqs(self):
"""
Core routine of the Facedancer execution/event loop. Continuously monitors the
GreatDancer's execution status, and reacts as events occur.
"""
status = self._fetch_irq_status()
# Other bits that may be of interest:
# D_SRI = start of frame received
# D_PCI = port change detect (switched between low, full, high speed state)
# D_SLI = device controller suspend
# D_UEI = USB error; completion of transaction caused error, see usb1_isr in firmware
# D_NAKI = both the tx/rx NAK bit and corresponding endpoint NAK enable are set
if status & self.USBSTS_D_UI:
self._handle_setup_events()
self._handle_transfer_events()
if status & self.USBSTS_D_URI:
self._bus_reset()
if status & self.USBSTS_D_NAKI:
self._handle_nak_events()