#
# This file is part of Facedancer.
#
""" Host support for GreatFET-base devices. """
import sys
import time
import codecs
import struct
from ..core import *
from ..endpoint import USBEndpoint
from ..types import *
from ..logging import log
[docs]
class GreatDancerHostApp(FacedancerUSBHost):
"""
Class that represents a GreatFET-based USB host.
"""
app_name = "GreatDancer Host"
PORT_STATUS_REG = 0
READ_STATUS_REG = 1
WRITE_STATUS_REG = 2
PORT_STATUS_REGISTER_CONNECTED_MASK = (1 << 0)
PORT_STATUS_REGISTER_ENABLED_MASK = (1 << 2)
PORT_STATUS_REGISTER_POWERED_MASK = (1 << 12)
PORT_STATUS_REGISTER_SPEED_SHIFT = 26
PORT_STATUS_REGISTER_SPEED_MASK = 0b11
PORT_STATUS_REGISTER_LINE_STATE_SHIFT = 10
PORT_STATUS_REGISTER_LINE_STATE_MASK = 0b11
LINE_STATE_NAMES = {
0: "SE0",
1: "J",
2: "K",
3: "No device / SE1"
}
LINE_STATE_SE0 = 0
LINE_STATE_J = 1
LINE_STATE_K = 2
LINE_STATE_SE1 = 3
DEVICE_SPEED_LOW = 0
DEVICE_SPEED_FULL = 1
DEVICE_SPEED_HIGH = 2
DEVICE_SPEED_NONE = 3
STATUS_REG_SPEED_VALUES = {
0: DEVICE_SPEED_FULL,
1: DEVICE_SPEED_LOW,
2: DEVICE_SPEED_HIGH,
3: DEVICE_SPEED_NONE
}
DEVICE_SPEED_NAMES = {
DEVICE_SPEED_FULL: "Full speed",
DEVICE_SPEED_LOW: "Low speed",
DEVICE_SPEED_HIGH: "High speed",
DEVICE_SPEED_NONE: "Disconnected"
}
SPEED_REQUESTS = {
0: 1,
1: 0,
2: 2,
3: 3
}
# Endpoint directions
DIRECTION_IN = 0x00
DIRECTION_OUT = 0x80
# Endpoint types
ENDPOINT_TYPE_CONTROL = 0
# Packet IDs
PID_SETUP = 2
PID_OUT = 0
PID_IN = 1
[docs]
@classmethod
def appropriate_for_environment(cls, backend_name):
"""
Determines if the current environment seems appropriate
for using the GreatDancer backend.
"""
# Check: if we have a backend name other than greatfet,
# the user is trying to use something else. Abort!
if backend_name and backend_name != "greatfet":
return False
# If we're not explicitly trying to use something else,
# see if there's a connected GreatFET.
try:
import greatfet
greatfet.GreatFET()
return True
except ImportError:
log.info("Skipping GreatFET-based devices, as the greatfet python module isn't installed.")
return False
except:
return False
[docs]
def __init__(self, verbose=0, quirks=[], autoconnect=True, device=None):
"""
Sets up a GreatFET-based host connection.
"""
import greatfet
if device is None:
device = greatfet.GreatFET()
# Store our input args.
# TODO: pull into base class
self.device = device
self.verbose = verbose
# Grab a reference to our protocol definitions.
self.vendor_requests = greatfet.protocol.vendor_requests
if autoconnect:
self.connect()
[docs]
def connect(self, device_speed=None):
"""
Sets up our host to talk to the device, including turning on VBUS.
"""
self.device.comms._vendor_request_out(self.vendor_requests.USBHOST_CONNECT)
[docs]
def bus_reset(self, delay=0.500):
"""
Issues a "bus reset", requesting that the downstream device reset itself.
Args:
delay : The amount of time, in seconds, to wait before or after the
reset request. To be compliant, this should be omitted, or set
to 0.1s.
"""
# Note: we need to wait a reset delay before and after the bus reset.
# This allows the host to initialize _and_ then allows the device to settle.
time.sleep(delay)
self.device.comms._vendor_request_out(self.vendor_requests.USBHOST_BUS_RESET)
time.sleep(delay)
@staticmethod
def _decode_usb_register(transfer_result):
"""
Decodes a raw 32-bit register value from a form encoded
for transit as a USB control request.
Args:
transfer_result : The value returned by the vendor request.
returns : The raw integer value of the given register.
"""
status_hex = codecs.encode(transfer_result[::-1], 'hex')
return int(status_hex, 16)
def _fetch_status_register(self, register_number):
"""
Fetches a status register from the GreatDacner, and returns it
as an integer.
"""
raw_status = self.device.comms._vendor_request_in(self.vendor_requests.USBHOST_GET_STATUS, index=register_number, length=4)
return self._decode_usb_register(raw_status)
def _port_status(self):
""" Returns the raw state of the port status register. """
return self._fetch_status_register(self.PORT_STATUS_REG)
def _get_read_status(self):
""" Returns the raw state of the read status word. """
return self._fetch_status_register(self.READ_STATUS_REG)
def _get_write_status(self):
""" Returns the raw state of the read status word. """
return self._fetch_status_register(self.WRITE_STATUS_REG)
[docs]
def device_is_connected(self):
""" Returns true iff a given device is connected. """
status = self._port_status()
return bool(status & self.PORT_STATUS_REGISTER_CONNECTED_MASK)
[docs]
def port_is_enabled(self):
""" Returns true iff the Facedancer host port's enabled. """
status = self._port_status()
return bool(status & self.PORT_STATUS_REGISTER_ENABLED_MASK)
[docs]
def port_is_powered(self):
""" Returns true iff the Facedancer host port's enabled. """
status = self._port_status()
return bool(status & self.PORT_STATUS_REGISTER_POWERED_MASK)
[docs]
def current_device_speed(self, as_string=False):
""" Returns the speed of the connected device
Args:
as_string : If true, returns the speed as a string for printing; otherwise
returns a DEVICE_SPEED_* constant.
"""
port_speed_raw = \
(self._port_status() >> self.PORT_STATUS_REGISTER_SPEED_SHIFT) & \
self.PORT_STATUS_REGISTER_SPEED_MASK
# Translate from a GreatFET format device speed to a Facedancer one.
port_speed = self.STATUS_REG_SPEED_VALUES[port_speed_raw]
if as_string:
port_speed = self.DEVICE_SPEED_NAMES[port_speed]
return port_speed
[docs]
def current_line_state(self, as_string=False):
""" Returns the current state of the USB differential pair
Args:
as_string : If true, returns the speed as a string for printing; otherwise
returns a LINE_STATE_* constant.
"""
line_state = \
(self._port_status() >> self.PORT_STATUS_REGISTER_LINE_STATE_SHIFT) & \
self.PORT_STATUS_REGISTER_LINE_STATE_MASK
if as_string:
line_state = self.LINE_STATE_NAMES[line_state]
return line_state
[docs]
def set_up_endpoint(self, endpoint_address_or_object, endpoint_type=None, max_packet_size=None,
device_address=None, endpoint_speed=None, handle_data_toggle=None,
is_control_endpoint=None):
"""
Sets up an endpoint for use. Can be used to initialize an endpoint or to update
its parameters. Two forms exist:
Args:
endpoint_object : a USBEndpoint object with the parameters to be populated
or
Args:
endpoint_address : the address of the endpoint to be setup; including the direction bit
endpoint_type : one of the ENDPOINT_TYPE constants that specifies the transfer mode on
the endpoint_address
max_packet_size : the maximum packet size to be communicated on the given endpoint
device_address : the address of the device to be communicated with; if not provided, the last
address will be used.
endpoint_speed : the speed of the packets to be communicated on the endpoint; should be a
DEVICE_SPEED_* constant; if not provided, the last device's speed will be used.
handle_data_toggle : true iff the hardware should automatically handle selection of data packet PIDs
is_control_endpoint : true iff the given packet is a for a control endpoint
"""
if isinstance(endpoint_address_or_object, USBEndpoint):
endpoint = endpoint_address_or_object
# Figure out the endpoint address from its direction and number.
endpoint_address = endpoint.number
if endpoint.direction == USBDirection.IN:
endpoint_address |= self.DIRECTION_IN
self.set_up_endpoint(endpoint_address, endpoint.transfer_type, endpoint.max_packet_size)
return
endpoint_address = endpoint_address_or_object
endpoint_number = endpoint_address & 0x7f
if endpoint_number > 15:
raise ValueError("cannot have an endpoint with a number > 15!")
# Figure out defaults for any arguments not provided.
if device_address is None:
device_address = self.last_device_address
if endpoint_speed is None:
endpoint_speed = self.last_device_speed
if is_control_endpoint is None:
is_control_endpoint = (endpoint_number == 0)
if handle_data_toggle is None:
handle_data_toggle = True if not is_control_endpoint else False
# Figure out which endpoint schedule to use.
# FIXME: support more than the asynchronous schedule
endpoint_schedule = 0
# TODO: do we translate speed requests, here?
# Issue the configuration packet.
packet = struct.pack("<BBBBBHB", endpoint_schedule, device_address, endpoint_address,
endpoint_speed, is_control_endpoint, max_packet_size, handle_data_toggle)
self.device.comms._vendor_request_out(self.vendor_requests.USBHOST_SET_UP_ENDPOINT, data=packet)
[docs]
def initialize_control_endpoint(self, device_address=None, device_speed=None, max_packet_size=None):
"""
Set up the device's control endpoint, so we can use it for e.g. enumeration.
"""
# If not overridden, apply the specification default maximum packet size.
# TODO: support high speed devices, here?
if max_packet_size is None:
max_packet_size = 8 if device_speed == self.DEVICE_SPEED_LOW else 64
# Set up both directions on the control endpoint.
self.set_up_endpoint(0 | self.DIRECTION_OUT, self.ENDPOINT_TYPE_CONTROL, max_packet_size)
self.set_up_endpoint(0 | self.DIRECTION_IN, self.ENDPOINT_TYPE_CONTROL, max_packet_size)
[docs]
def send_on_endpoint(self, endpoint_number, data, is_setup=False,
blocking=True, data_packet_pid=0):
"""
Sends a block of data on the provided endpoints.
Args:
endpoint_number : The endpoint number on which to send.
data : The data to be transmitted.
is_setup : True iff this transfer should begin with a SETUP token.
blocking : True iff this transaction should wait for the transaction to
complete.
data_packet_pid : The data packet PID to use (1 or 0). Ignored if the endpoint is
set to automatically alternate data PIDs.
Raises an IOError on a communications error or stall.
"""
# Determine the PID token with which to start the request...
pid_token = self.PID_SETUP if is_setup else self.PID_OUT
# Issue the actual send itself.
# TODO: validate length
self.device.comms._vendor_request_out(self.vendor_requests.USBHOST_SEND_ON_ENDPOINT,
index=endpoint_number, value=(data_packet_pid << 8) | pid_token,
data=data)
# ... and if we're blocking, also finish it.
if blocking:
complete = False
stalled = False
# Wait until we get a complete flag in the status register.
# XXX: This isn't entirely correct-- it'll clear too much status.
while not complete:
status = self._get_write_status()
stalled = (status >> endpoint_number) & 0x1
complete = (status >> (endpoint_number + 16)) & 0x1
if stalled:
raise IOError("Stalled!")
[docs]
def read_from_endpoint(self, endpoint_number, expected_read_size=64, data_packet_pid=0):
"""
Sends a block of data on the provided endpoints.
Args:
endpoint_number : The endpoint number on which to send.
expected_read_size : The expected amount of data to be read.
data_packet_pid : The data packet PID to use (1 or 0).
Ignored if the endpoint is set to automatically alternate data PIDs.
Raises an IOError on a communications error or stall.
"""
# Start the request...
self.device.comms._vendor_request_out(self.vendor_requests.USBHOST_START_NONBLOCKING_READ,
index=(data_packet_pid << 8) | endpoint_number, value=expected_read_size)
# ... and if we're blocking, also finish it.
complete = False
stalled = False
# Wait until we get a complete flag in the status register.
# XXX: This isn't entirely correct-- it'll clear too much status.
while not complete:
status = self._get_read_status()
stalled = (status >> endpoint_number) & 0x1
complete = (status >> (endpoint_number + 16)) & 0x1
if stalled:
raise IOError("Stalled!")
# Figure out how muhc to read.
raw_length = self.device.comms._vendor_request_in(self.vendor_requests.USBHOST_GET_NONBLOCKING_LENGTH,
index=endpoint_number, length=4)
length = self._decode_usb_register(raw_length)
if self.verbose > 4:
print("Supposedly, we've got {} bytes of data to read".format(length))
# If there's no data available, we don't need to waste time reading anything.
if length == 0:
return b''
# Otherwise, read the data from the endpoint and return it.
data = self.device.comms._vendor_request_in(self.vendor_requests.USBHOST_FINISH_NONBLOCKING_READ,
index=endpoint_number, length=length)
return data.tobytes()