#
# This file is part of Facedancer.
#
""" Host support for accessing libusb with a Facedancer-like syntax. """
import sys
import time
import codecs
import struct
import usb
from ..core import *
[docs]
class LibUSBHostApp(FacedancerUSBHost):
"""
Class that represents a libusb-based USB host.
"""
app_name = "LibUSB Host"
[docs]
@classmethod
def appropriate_for_environment(cls, backend_name):
"""
Determines if the current environment seems appropriate
for using the libusb backend.
"""
# For this to work, we need to somehow select a single port.
# The best way to do this is with BUS and PORT, so allow the user
# to select those using the environment.
# TODO: accept these via quirks?
if os.environ.get('LIBUSB_BUS') and os.environ.get('LIBUSB_PORT'):
return True
# As a stand-in, allow use if the user specifies a device's address.
if os.environ.get('LIBUSB_ADDRESS'):
return True
# Never automatically instantiate the libusb backend,
# as it's not a full implementation and requires host-OS oddities.
return False
[docs]
def __init__(self, verbose=0, quirks=[], index=0, **kwargs):
"""
Creates a new libusb backend for communicating with a target device.
"""
self.verbose = verbose
# If we have a specified bus/port, accept them.
# TODO: accept these via quirks?
desired_bus = os.environ.get('LIBUSB_BUS')
desired_port = os.environ.get('LIBUSB_PORT')
if desired_bus and desired_port:
kwargs['bus'] = int(desired_bus)
kwargs['port_number'] = int(desired_port)
# If the user's searching by address, use that.
# TODO: accept these via quirks?
desired_address = os.environ.get('LIBUSB_ADDRESS')
if desired_address:
kwargs['address'] = int(desired_address)
# Open a connection to the target device...
usb_devices = list(usb.core.find(find_all=True, **kwargs))
if len(usb_devices) <= index:
raise DeviceNotFoundError("Could not find a device to connect to via libusb!")
self.device = usb_devices[index]
# Detach any existing drivers, where possible.
try:
index = self.device.get_active_configuration().index
self.device.detach_kernel_driver(index)
except:
# FIXME: note this here, with a warning?
pass
[docs]
def connect(self, device_speed=None):
"""
Sets up our host to talk to the device, including turning on VBUS.
"""
pass
[docs]
def bus_reset(self, delay=0):
"""
Issues a "bus reset", requesting that the downstream device reset itself.
Args:
delay : The amount of time, in seconds, to wait before or after the
reset request. To be compliant, this should be omitted, or set
to 0.1s.
"""
# Note: we need to wait a reset delay before and after the bus reset.
# This allows the host to initialize _and_ then allows the device to settle.
time.sleep(delay)
self.device.reset()
time.sleep(delay)
[docs]
def current_device_speed(self, as_string=False):
""" Returns the speed of the connected device
Args:
as_string : If true, returns the speed as a string for printing; otherwise returns
a DEVICE_SPEED_* constant.
"""
return self.device.speed
[docs]
def current_line_state(self, as_string=False):
""" Returns the current state of the USB differential pair
as_string : If true, returns the speed as a string for printing; otherwise
returns a LINE_STATE_* constant.
"""
return None
[docs]
def device_is_connected(self):
""" Returns true iff a given device is connected. """
return True
[docs]
def port_is_enabled(self):
""" Returns true iff a given device is connected. """
return True
[docs]
def port_is_powered(self):
""" Returns true iff a given device is connected. """
return True
[docs]
def set_up_endpoint(self, endpoint_address_or_object, endpoint_type=None, max_packet_size=None,
device_address=None, endpoint_speed=None, handle_data_toggle=None,
is_control_endpoint=None):
"""
Sets up an endpoint for use. Can be used to initialize an endpoint or to update
its parameters. Two forms exist:
Args:
endpoint_object : a USBEndpoint object with the parameters to be populated
or
Args:
endpoint_address : the address of the endpoint to be setup; including the direction bit
endpoint_type : one of the ENDPOINT_TYPE constants that specifies the transfer mode on
the endpoint_address
max_packet_size : the maximum packet size to be communicated on the given endpoint
device_address : the address of the device to be communicated with; if not provided, the
last address will be used
endpoint_speed : the speed of the packets to be communicated on the endpoint; should be a
DEVICE_SPEED_* constant; if not provided, the last device's speed will be used.
handle_data_toggle : true iff the hardware should automatically handle selection of data packet PIDs
is_control_endpoint : true iff the given packet is a for a control endpoint
"""
# TODO: eventually support hubs / more than one device?
pass
[docs]
def initialize_control_endpoint(self, device_address=None, device_speed=None, max_packet_size=None):
"""
Set up the device's control endpoint, so we can use it for e.g. enumeration.
"""
pass
[docs]
def send_on_endpoint(self, endpoint_number, data, is_setup=False,
blocking=True, data_packet_pid=0):
"""
Sends a block of data on the provided endpoints.
Args:
endpoint_number : The endpoint number on which to send.
data : The data to be transmitted.
is_setup : True iff this transfer should begin with a SETUP token.
blocking : True iff this transaction should wait for the transaction to complete.
data_packet_pid : The data packet PID to use (1 or 0). Ignored if the endpoint is set to automatically
alternate data PIDs.
raises an IOError on a communications error or stall
"""
self.device.write(endpoint_number, data)
[docs]
def read_from_endpoint(self, endpoint_number, expected_read_size=64, data_packet_pid=0):
"""
Sends a block of data on the provided endpoints.
Args:
endpoint_number : The endpoint number on which to send.
expected_read_size : The expected amount of data to be read.
data_packet_pid : The data packet PID to use (1 or 0).
Ignored if the endpoint is set to automatically alternate data PIDs.
raises an IOError on a communications error or stall
"""
data = self.device.read(endpoint_number, expected_read_size)
return data.tobytes()
[docs]
def control_request_in(self, request_type, recipient, request, value=0, index=0, length=0):
""" Performs an IN control request.
Args:
request_type : Determines if this is a standard, class, or vendor request. Accepts a
REQUEST_TYPE_* constant.
recipient : Determines the context in which this command is interpreted. Accepts a
REQUEST_RECIPIENT_* constant.
request : The request number to be performed.
value, index : The standard USB request arguments, to be included in the setup packet.
Their meaning varies depending on the request.
length : The maximum length of data expected in response, or 0 if we don't expect any data back.
"""
request_type = self._build_request_type(True, request_type, recipient)
data = self.device.ctrl_transfer(request_type, request,
value, index, length)
return data.tobytes()
[docs]
def control_request_out(self, request_type, recipient, request, value=0, index=0, data=[]):
""" Performs an OUT control request.
Args:
request_type : Determines if this is a standard, class, or vendor request. Accepts a
REQUEST_TYPE_* constant.
recipient : Determines the context in which this command is interpreted. Accepts a
REQUEST_RECIPIENT_* constant.
request : The request number to be performed.
value, index : The standard USB request arguments, to be included in the setup packet. Their meaning
varies depending on the request.
data : The data to be transmitted with this control request.
"""
request_type = self._build_request_type(True, request_type, recipient)
self.device.ctrl_transfer(request_type, request,
value, index, data)