# Facedancer.py
#
# Contains the core methods for working with a facedancer, inclduing methods
# necessary for autodetection.
# and GoodFETMonitorApp.
import os
from .errors import *
from .logging import log
[docs]
def FacedancerUSBApp(verbose=0, quirks=None):
"""
Convenience function that automatically creates a FacedancerApp
based on the BOARD environment variable and some crude internal
automagic.
Args:
verbose : Sets the verbosity level of the relevant app. Increasing
this from zero yields progressively more output.
"""
return FacedancerApp.autodetect(verbose, quirks)
[docs]
class FacedancerApp:
app_name = "override this"
app_num = 0x00
[docs]
@classmethod
def autodetect(cls, verbose=0, quirks=None):
"""
Convenience function that automatically creates the appropriate
subclass based on the BOARD environment variable and some crude internal
automagic.
Args:
verbose: Sets the verbosity level of the relevant app. Increasing
this from zero yields progressively more output.
"""
if 'BACKEND' in os.environ:
backend_name = os.environ['BACKEND'].lower()
else:
backend_name = None
# Iterate over each subclass of FacedancerApp until we find one
# that seems appropriate.
subclass = cls._find_appropriate_subclass(backend_name)
if subclass:
if verbose > 0:
log.info("Using {} backend.".format(subclass.app_name))
return subclass(verbose=verbose, quirks=quirks)
else:
log.error("FacedancerApp failed to autodetect any Facedancer devices.")
log.error("Try specifying a backend with: BACKEND=\"<backend name>\" <your app>")
raise DeviceNotFoundError("FacedancerApp failed to autodetect any Facedancer devices.")
@classmethod
def _find_appropriate_subclass(cls, backend_name):
# Recursive case: if we have any subnodes, see if they are
# feed them to this function.
for subclass in cls.__subclasses__():
# Check to see if the subnode has any appropriate children.
appropriate_class = subclass._find_appropriate_subclass(backend_name)
# If it does, that's our answer!
if appropriate_class:
return appropriate_class
# Base case: check the current node.
if cls.appropriate_for_environment(backend_name):
return cls
else:
return None
[docs]
@classmethod
def appropriate_for_environment(cls, backend_name=None):
"""
Returns true if the current class is likely to be the appropriate
class to connect to a facedancer given the board_name and other
environmental factors.
Args:
backend_name : The name of the backend, as typically retrieved from the BACKEND
environment variable, or None to try figuring things out based
on other environmental factors.
"""
return False
def __init__(self, device, verbose=0):
self.device = device
self.verbose = verbose
self.init_commands()
if self.verbose > 0:
log.info(self.app_name, "initialized")
[docs]
def init_commands(self):
pass
[docs]
def FacedancerUSBHostApp(verbose=0, quirks=None):
"""
Convenience function that automatically creates a FacedancerApp
based on the BOARD environment variable and some crude internal
automagic.
verbose: Sets the verbosity level of the relevant app. Increasing
this from zero yields progressively more output.
"""
return FacedancerUSBHost.autodetect(verbose, quirks)
[docs]
class FacedancerUSBHost:
"""
Base class for Facedancer host connections-- extended to provide actual
connections to each host.
"""
# TODO: remove this redundancy; these should be somewhere common
# Endpoint directions
ENDPOINT_DIRECTION_OUT = 0x00
ENDPOINT_DIRECTION_IN = 0x80
# Endpoint types
ENDPOINT_TYPE_CONTROL = 0
# Packet IDs
PID_SETUP = 2
PID_OUT = 0
PID_IN = 1
# USB Request Types
REQUEST_TYPE_STANDARD = 0
REQUEST_TYPE_CLASS = 1
REQUEST_TYPE_VENDOR = 2
REQUEST_TYPE_RESERVED = 3
# USB Request Recipients
REQUEST_RECIPIENT_DEVICE = 0
REQUEST_RECIPIENT_INTERFACE = 1
REQUEST_RECIPIENT_ENDPOINT = 2
REQUEST_RECIPIENT_OTHER = 3
# USB Standard Requests
STANDARD_REQUEST_GET_STATUS = 0
STANDARD_REQUEST_SET_ADDRESS = 5
STANDARD_REQUEST_GET_DESCRIPTOR = 6
STANDARD_REQUEST_SET_CONFIGURATION = 9
[docs]
@classmethod
def autodetect(cls, verbose=0, quirks=None):
"""
Convenience function that automatically creates the appropriate
subclass based on the BOARD environment variable and some crude internal
automagic.
Args:
verbose: Sets the verbosity level of the relevant app. Increasing
this from zero yields progressively more output.
"""
# TODO: Filter this out into some kind of autodetecting base class...
if 'BACKEND' in os.environ:
backend_name = os.environ['BACKEND'].lower()
else:
backend_name = None
# Iterate over each subclass of FacedancerApp until we find one
# that seems appropriate.
subclass = cls._find_appropriate_subclass(backend_name)
if subclass:
if verbose > 0:
log.info("Using {} backend.".format(subclass.app_name))
return subclass(verbose=verbose, quirks=quirks)
else:
log.error("FacedancerUSBHost failed to autodetect any Facedancer devices.")
log.error("Try specifying a backend with: BACKEND=\"<backend name>\" <your app>")
raise DeviceNotFoundError("FacedancerUSBHost failed to autodetect any Facedancer devices.")
@classmethod
def _find_appropriate_subclass(cls, backend_name):
# TODO: Filter this out into some kind of autodetecting base class...
# Recursive case: if we have any subnodes, see if they are
# feed them to this function.
for subclass in cls.__subclasses__():
# Check to see if the subnode has any appropriate children.
appropriate_class = subclass._find_appropriate_subclass(backend_name)
# If it does, that's our answer!
if appropriate_class:
return appropriate_class
# Base case: check the current node.
if cls.appropriate_for_environment(backend_name):
return cls
else:
return None
[docs]
@classmethod
def appropriate_for_environment(cls, backend_name=None):
"""
Returns true if the current class is likely to be the appropriate
class to connect to a facedancer given the board_name and other
environmental factors.
Args:
backend_name : The name of the backend, as typically retrieved from the BACKEND
environment variable, or None to try figuring things out based
on other environmental factors.
"""
return False
@classmethod
def _build_request_type(cls, is_in, req_type, recipient):
""" Builds the request type field for a USB request.
Args:
is_in : True iff this is a DEVICE-to-HOST request.
req_type : The type of request to be used.
recipient : The context in which this request should be interpreted.
Returns : a request_type byte
"""
request_type = 0
if is_in:
request_type |= cls.ENDPOINT_DIRECTION_IN
request_type |= (req_type << 5)
request_type |= (recipient)
return request_type
@classmethod
def _build_setup_request(cls, is_in, request_type, recipient, request, value, index, length):
""" Builds a setup request packet from the standard USB request fields. """
# Fields:
# uint8_t request_type;
# uint8_t request;
# uint16_t value;
# uint16_t index;
# uint16_t length;
def split(value):
value_high = value >> 8
value_low = value & 0xFF
return [value_low, value_high]
setup_request = [cls._build_request_type(is_in, request_type, recipient), request]
setup_request.extend(split(value))
setup_request.extend(split(index))
setup_request.extend(split(length))
return setup_request
[docs]
def control_request_in(self, request_type, recipient, request, value=0, index=0, length=0):
""" Performs an IN control request.
Args:
request_type : Determines if this is a standard, class, or vendor request. Accepts
a REQUEST_TYPE_* constant.
recipient : Determines the context in which this command is interpreted. Accepts
a REQUEST_RECIPIENT_* constant.
request : The request number to be performed.
value, index : The standard USB request arguments, to be included in the setup
packet. Their meaning varies depending on the request.
length : The maximum length of data expected in response, or 0 if we don't
expect any data back.
"""
# Create the raw setup request, and send it.
setup_request = self._build_setup_request(True, request_type, recipient,
request, value, index, length)
if self.verbose > 4:
log.info("Issuing setup packet: {}".format(setup_request))
self.send_on_endpoint(0, setup_request, True, data_packet_pid=0)
if self.verbose > 4:
log.info("Done.")
# If we have a data stage, issue it:
if length:
if self.verbose > 4:
log.info("Reading response... ")
data = self.read_from_endpoint(0, length, data_packet_pid=1)
if self.verbose > 4:
log.info("Got response: {}".format(data))
# and give the host an opportunity to ACK by sending a ZLP.
self.send_on_endpoint(0, [], data_packet_pid=1)
return data
else:
self.read_from_endpoint(0, 0, data_packet_pid=1)
[docs]
def control_request_out(self, request_type, recipient, request, value=0, index=0, data=[]):
""" Performs an OUT control request.
Args:
request_type : Determines if this is a standard, class, or vendor request. Accepts
a REQUEST_TYPE_* constant.
recipient : Determines the context in which this command is interpreted. Accepts
a REQUEST_RECIPIENT_* constant.
request : The request number to be performed.
value, index : The standard USB request arguments, to be included in the setup
packet. Their meaning varies depending on the request.
data : The data to be transmitted with this control request.
"""
# Create the raw setup request, and send it.
setup_request = self._build_setup_request(False, request_type, recipient,
request, value, index, len(data))
self.send_on_endpoint(0, setup_request, True)
# If we have a data stage, issue it:
if data:
self.send_on_endpoint(0, data)
# And try to read a ZLP from the host for ACK'ing purposes.
self.read_from_endpoint(0, 0, data_packet_pid=1)
[docs]
def initialize_device(self, apply_configuration=0, assign_address=0):
"""
Sets up a connection to a directly-attached USB device.
Args:
apply_configuration : If non-zero, the configuration with the given index will
be applied to the relevant device.
assign_address : If non-zero, the device will be assigned the given address
as part of the enumeration/initialization process.
"""
# TODO: support timeouts in waiting for a connection
# Repeatedly attempt to connect to any connected devices.
while not self.device_is_connected():
self.bus_reset()
# Assume the default device addresses, and read the device's speed.
self.last_device_address = 0
self.last_device_speed = self.current_device_speed()
# Set up the device to work.
if self.verbose > 3:
log.info("Initializing control endpoint...")
self.initialize_control_endpoint()
# Try to ask the device for its maximum packet size on EP0.
self.last_ep0_max_packet_size = self.read_ep0_max_packet_size()
# If we've been asked to assign an address,
# set the device's address, and reinitialize the control endpoint
# with the updated address.
if assign_address:
self.set_address(assign_address)
self.initialize_control_endpoint(max_packet_size=self.last_ep0_max_packet_size)
# If we're auto-configuring the device, read the full configuration descriptor,
# assign the first configuration, and then set up endpoints accordingly
if apply_configuration:
self.apply_configuration(apply_configuration)
[docs]
def get_descriptor(self, descriptor_type, descriptor_index,
language_id, max_length):
""" Reads up to max_length bytes of a device's descriptors. """
return self.control_request_in(
self.REQUEST_TYPE_STANDARD, self.REQUEST_RECIPIENT_DEVICE,
self.STANDARD_REQUEST_GET_DESCRIPTOR,
(descriptor_type << 8) | descriptor_index, language_id, max_length)
[docs]
def get_device_descriptor(self, max_length=18):
""" Returns the device's device descriptor. """
from .device import USBDevice
raw_descriptor = self.get_descriptor(USBDevice.DESCRIPTOR_TYPE_NUMBER, 0, 0, max_length)
return USBDevice.from_binary_descriptor(raw_descriptor)
[docs]
def read_ep0_max_packet_size(self):
""" Returns the device's reported maximum packet size on EP0, in a way appropriate for an barely-configured endpoint. """
device_descriptor = self.get_device_descriptor(max_length=8)
return device_descriptor.max_packet_size_ep0
[docs]
def get_configuration_descriptor(self, index=0, include_subordinates=True):
""" Returns the device's configuration descriptor.
Args:
include_subordinate : if true, subordinate descriptors will also be returned
"""
from .configuration import USBConfiguration
# Read just the raw configuration descriptor.
raw_descriptor = self.get_descriptor(USBConfiguration.DESCRIPTOR_TYPE_NUMBER, index, 0, USBConfiguration.DESCRIPTOR_SIZE_BYTES)
# If we want to include the subordinate descriptors, read-read the configuration descriptor with an updated length.
if include_subordinates:
from struct import unpack
total_descriptor_lengths = unpack('<H', raw_descriptor[2:4])[0]
raw_descriptor = self.get_descriptor(USBConfiguration.DESCRIPTOR_TYPE_NUMBER, index, 0, total_descriptor_lengths)
return USBConfiguration.from_binary_descriptor(raw_descriptor)
[docs]
def set_address(self, device_address):
""" Sets the device's address.
Note that all endpoints must be set up again after issuing the new address;
the easiest way to do this is to call apply_configuration().
Args:
device_address : the address to apply to the given device
"""
self.control_request_out(
self.REQUEST_TYPE_STANDARD, self.REQUEST_RECIPIENT_DEVICE,
self.STANDARD_REQUEST_SET_ADDRESS, value=device_address)
self.last_device_address = device_address
[docs]
def set_configuration(self, index):
""" Sets the device's active configuration.
Note that this does not configure the host for the given configuration.
Most of the time, you probably want apply_configuration, which does.
Args:
index : the index of the configuration to apply
"""
self.control_request_out(
self.REQUEST_TYPE_STANDARD, self.REQUEST_RECIPIENT_DEVICE,
self.STANDARD_REQUEST_SET_CONFIGURATION, value=index)
[docs]
def apply_configuration(self, index, set_configuration=True):
""" Applies a device's configuration. Necessary to use endpoints other
than the control endpoint.
Args:
index : The configuration index to apply.
set_configuration : If true, also informs the device of the change.
Setting this to false can allow the host to update its view of all
endpoints without communicating with the device -- e.g. to update the
device's address.
"""
# Read the full set of descriptors for the given configuration...
# TODO: don't assume that the indices increment nicely from 1?
configuration = self.get_configuration_descriptor(index - 1)
# If we're informing the device of the change, do so.
if set_configuration:
self.set_configuration(index)
# Locally, set up our endpoints to handle device communication.
for interface in configuration.interfaces.values():
for endpoint in interface.endpoints.values():
self.set_up_endpoint(endpoint)
[docs]
def handle_events(self):
self.service_irqs()
[docs]
class FacedancerBasicScheduler(object):
"""
Most basic scheduler for Facedancer devices-- and the schedule which is
created implicitly if no other scheduler is provided. Executes each of its
tasks in order, over and over.
"""
do_exit = False
def __init__(self):
self.tasks = []
self.do_exit = False
[docs]
def add_task(self, callback):
"""
Adds a facedancer task to the scheduler, which will be called
repeatedly according to the internal scheduling algorithm
callback: The callback to be scheduled.
"""
self.tasks.append(callback)
[docs]
def run(self):
"""
Run the main scheduler stack.
"""
self.do_exit = False
while not self.do_exit:
for task in self.tasks:
task()
[docs]
def stop(self):
"""
Stop the scheduler on next loop.
"""
self.do_exit = True