"""
Backend for the Hydradancer boards.
Supports 5 endpoints, with addresses between 0 and 7. Supports low, full and high-speed.
"""
import sys
import logging
import time
from array import array
from time import time_ns
from dataclasses import dataclass
from typing import List, Dict, Any
import usb
from usb.util import CTRL_TYPE_VENDOR, CTRL_RECIPIENT_DEVICE, CTRL_IN, CTRL_OUT
from ..core import *
from ..device import USBDevice, USBConfiguration, USBDirection, USBEndpoint
from ..types import DeviceSpeed
from ..logging import log
from .base import FacedancerBackend
[docs]
@dataclass
class HydradancerEvent:
# Events
EVENT_BUS_RESET = 0x0
EVENT_IN_BUFFER_AVAILABLE = 0x1
EVENT_OUT_BUFFER_AVAILABLE = 0x2
EVENT_NAK = 0x3
event_type : int = -1
value : int = -1
[docs]
@staticmethod
def from_bytes(data : bytes):
return HydradancerEvent(event_type = data[0], value = data[1])
def __repr__(self):
return f"event_type {self.event_type} value {self.value}"
[docs]
class HydradancerHostApp(FacedancerApp, FacedancerBackend):
"""
Backend for the HydraUSB3 boards.
"""
app_name = "Hydradancer Host"
MANUFACTURER_STRING = "Quarkslab https://www.quarkslab.com/ & HydraBus https://hydrabus.com/"
# USB directions
HOST_TO_DEVICE = 0
DEVICE_TO_HOST = 1
USB2_MAX_EP_IN = 16
current_setup_req = None
[docs]
def __init__(self, device: USBDevice=None, verbose: int=0, quirks: List[str]=[]):
"""
Initializes the backend.
Args:
device : The device that will act as our Facedancer. (Optional)
verbose : The verbosity level of the given application. (Optional)
quirks : List of USB platform quirks. (Optional)
"""
super().__init__(self)
self.configuration = None
self.pending_control_out_request = None
self.connected_device = None
self.max_ep0_packet_size = None
self.ep_transfer_queue : List[List[Any]] = [[]] * self.USB2_MAX_EP_IN
self.ep_in : Dict[int, USBEndpoint] = {}
self.ep_out : Dict[int, USBEndpoint] = {}
self.api = HydradancerBoard()
self.verbose = verbose
self.api.wait_board_ready()
[docs]
@classmethod
def appropriate_for_environment(cls, backend_name: str) -> bool:
"""
Determines if the current environment seems appropriate
for using this backend.
Args:
backend_name : Backend name being requested. (Optional)
"""
logging.info("this is hydradancer hi")
# Open a connection to the target device...
device = usb.core.find(idVendor=0x16c0, idProduct=0x27d8)
if device is not None and device.manufacturer == cls.MANUFACTURER_STRING and backend_name == "hydradancer":
return True
return False
[docs]
def get_version(self):
"""
Returns information about the active Facedancer version.
"""
raise NotImplementedError
[docs]
def connect(self, usb_device: USBDevice, max_packet_size_ep0: int=64, device_speed: DeviceSpeed=DeviceSpeed.FULL):
"""
Prepares backend to connect to the target host and emulate
a given device.
Args:
usb_device : The USBDevice object that represents the emulated device.
max_packet_size_ep0 : Max packet size for control endpoint.
device_speed : Requested usb speed for the Facedancer board.
"""
self.api.set_endpoint_mapping(0)
if device_speed not in [DeviceSpeed.LOW, DeviceSpeed.FULL, DeviceSpeed.HIGH]:
log.warning(f"Hydradancer only supports USB Low, Full and High Speed. Ignoring requested speed: {device_speed.name}")
self.api.set_usb2_speed(device_speed)
logging.info("connect ...")
self.api.connect()
self.connected_device = usb_device
self.max_ep0_packet_size = max_packet_size_ep0
[docs]
def disconnect(self):
""" Disconnects Facedancer from the target host. """
logging.info("disconnect")
self.configuration = None
self.pending_control_out_request = None
self.connected_device = None
self.max_ep0_packet_size = 0
self.ep_transfer_queue = [[]] * self.USB2_MAX_EP_IN
self.api.disconnect()
[docs]
def reset(self):
"""
Triggers the Facedancer to handle its side of a bus reset.
"""
logging.info("bus reset")
[docs]
def set_address(self, address: int, defer: bool=False):
"""
Sets the device address of the Facedancer. Usually only used during
initial configuration.
Args:
address : The address the Facedancer should assume.
defer : True iff the set_address request should wait for an active transaction to
finish.
"""
logging.info("set address")
self.api.set_address(address, defer)
[docs]
def read_from_endpoint(self, endpoint_number: int) -> bytes:
"""
Reads a block of data from the given endpoint.
Args:
endpoint_number : The number of the OUT endpoint on which data is to be rx'd.
"""
return self.api.read(endpoint_number, blocking=True)
[docs]
def send_on_endpoint(self, endpoint_number: int, data: bytes, blocking: bool=True):
"""
Sends a collection of USB data on a given endpoint.
Args:
endpoint_number : The number of the IN endpoint on which data should be sent.
data : The data to be sent.
blocking : If true, this function should wait for the transfer to complete.
"""
if endpoint_number != 0 and not blocking and not self.api.in_buffer_empty(endpoint_number):
logging.debug(f"Storing {len(data)} on ep {endpoint_number} for later")
self.ep_transfer_queue[endpoint_number].append(data)
return
backup_len = len(data)
max_packet_size = self.max_ep0_packet_size if endpoint_number == 0 else self.ep_in[endpoint_number].max_packet_size
if not data:
self.api.send(endpoint_number, data)
while data:
packet = data[0:max_packet_size]
data = data[len(packet):]
logging.debug(f"Sending {len(packet)} on ep {endpoint_number}")
self.api.send(endpoint_number, packet)
# Many things to take into account here ...
# first, if the len we are sending is a multiple of the max_packet_size, the host will request a ZLP (otherwise, it can't know when the transfer ends)
# however, if the endpoint is endpoint 0, the host knows the size of the transfer in advance so it might not request the ZLP
# this could be solved by using NAKs for EP0 as well (answering by a ZLP if a NAK is received but we already sent everything)
# however, this could add too much latency and make enumeration fail
if endpoint_number == 0 and (backup_len % max_packet_size) == 0 and backup_len > 0 and backup_len != self.current_setup_req.length:
logging.debug(f"Sending ZLP")
self.api.send(endpoint_number, b"") # Sending ZLP
[docs]
def ack_status_stage(self, direction: USBDirection=USBDirection.OUT, endpoint_number:int =0, blocking: bool=False):
"""
Handles the status stage of a correctly completed control request,
by priming the appropriate endpoint to handle the status phase.
Args:
direction : Determines if we're ACK'ing an IN or OUT vendor request.
(This should match the direction of the DATA stage.)
endpoint_number : The endpoint number on which the control request
occurred.
blocking : True if we should wait for the ACK to be fully issued
before returning.
"""
if direction == USBDirection.OUT:
# If this was an OUT request, we'll prime the output buffer to
# respond with the ZLP expected during the status stage.
self.send_on_endpoint(endpoint_number, data=b"", blocking=blocking)
else:
# If this was an IN request, we'll need to set up a transfer descriptor
# so the status phase can operate correctly. This effectively reads the
# zero length packet from the STATUS phase.
self.read_from_endpoint(endpoint_number)
[docs]
def stall_endpoint(self, endpoint_number:int, direction: USBDirection=USBDirection.OUT):
"""
Stalls the provided endpoint, as defined in the USB spec.
Args:
endpoint_number : The number of the endpoint to be stalled.
"""
in_vs_out = "IN" if direction else "OUT"
logging.info(f"Stalling EP {endpoint_number} {in_vs_out}")
self.api.stall_endpoint(endpoint_number, direction)
[docs]
def clear_halt(self, endpoint_number:int, direction: USBDirection):
""" Clears a halt condition on the provided non-control endpoint.
Args:
endpoint_number : The endpoint number
direction : The endpoint direction; or OUT if not provided.
"""
logging.debug(f"Clearing halt for endpoint {endpoint_number}")
self.api.clear_halt(endpoint_number, direction)
[docs]
def service_irqs(self):
"""
Core routine of the Facedancer execution/event loop. Continuously monitors the
Facedancer's execution status, and reacts as events occur.
"""
events = self.api.fetch_events()
if events is not None:
for event in events:
if event is None:
continue
if event.event_type == HydradancerEvent.EVENT_BUS_RESET:
self.handle_bus_reset()
if event.event_type == HydradancerEvent.EVENT_IN_BUFFER_AVAILABLE and event.value != 0 and (event.value in self.ep_in.keys()):
self.connected_device.handle_buffer_empty(self.ep_in[event.value])
self.handle_control_request()
self.handle_data_endpoints()
[docs]
def handle_bus_reset(self):
"""
Triggers Hydradancer to perform its side of a bus reset.
"""
if self.connected_device:
self.connected_device.handle_bus_reset()
else:
self.reset()
[docs]
def handle_data_endpoints(self):
"""
Handle IN or OUT requests on non-control endpoints.
"""
# process ep OUT firsts, transfer is dictated by the host, if there is data available on an ep OUT,
# it should be processed before setting new IN data
for ep_num in self.ep_out:
if self.api.out_buffer_available(ep_num):
data = self.api.read(ep_num)
if data is not None:
self.connected_device.handle_data_available(
ep_num, data.tobytes())
for ep_num, ep in self.ep_in.items():
if self.api.in_buffer_empty(ep_num) and self.api.nak_on_endpoint(ep_num):
if len(self.ep_transfer_queue[ep_num]) != 0:
max_packet_size = ep.max_packet_size
packet = self.ep_transfer_queue[ep_num][0][0:max_packet_size]
self.ep_transfer_queue[ep_num][0] = self.ep_transfer_queue[ep_num][0][len(packet):]
self.api.send(ep_num, packet)
if len(self.ep_transfer_queue[ep_num][0]) == 0:
self.ep_transfer_queue[ep_num].pop(0)
else:
self.connected_device.handle_nak(ep_num)
[docs]
def handle_control_request(self):
if not self.api.control_buffer_available():
return
data = self.api.read(0)
if data is None:
return
logging.debug(
f"CONTROL EP/OUT: -> size {len(data)} {bytes(data)}")
# inspired from moondancer and greatdancer backends
if self.pending_control_out_request is not None:
self.pending_control_out_request.data.extend(data)
all_data_received = len(
self.pending_control_out_request.data) == self.pending_control_out_request.length
is_short_packet = len(data) < self.max_ep0_packet_size
if all_data_received or is_short_packet:
self.connected_device.handle_request(
self.pending_control_out_request)
self.pending_control_out_request = None
elif len(data) > 0:
request = self.connected_device.create_request(data)
is_out = request.get_direction() == self.HOST_TO_DEVICE
has_data = (request.length > 0)
self.current_setup_req = request
if is_out and has_data:
logging.debug("queuing Control OUT req, waiting for more data")
self.pending_control_out_request = request
return
self.connected_device.handle_request(request)
# handle status stage of IN transfer
elif len(data) == 0:
logging.debug("Received ACK for IN Ctrl req")
[docs]
class HydradancerBoardFatalError(Exception):
pass
[docs]
class HydradancerBoard():
"""
Handles the communication with the Hydradancer control board and manages the events it sends.
"""
MAX_PACKET_SIZE = 1024
# USB Vendor Requests codes
ENABLE_USB_CONNECTION = 50
SET_ADDRESS = 51
GET_EVENT = 52
SET_ENDPOINT_MAPPING = 53
DISABLE_USB = 54
SET_SPEED = 55
SET_EP_RESPONSE = 56
CHECK_HYDRADANCER_READY = 57
DO_BUS_RESET = 58
CONFIGURED = 59
CLEAR_HALT = 60
# Facedancer USB2 speed to Hydradancer USB2 speed
facedancer_to_hydradancer_speed = {
DeviceSpeed.LOW : 0,
DeviceSpeed.FULL : 1,
DeviceSpeed.HIGH : 2
}
# Max number of events that can be sent by the board
# This must not be less than what is defined in the firmware
EVENT_QUEUE_SIZE = 100
# Endpoint states on the emulation board
ENDP_STATE_ACK = 0x00
ENDP_STATE_NAK = 0x02
ENDP_STATE_STALL = 0x03
# USB endpoints direction
HOST_TO_DEVICE = 0
DEVICE_TO_HOST = 1
EP_POLL_NUMBER = 1
SUPPORTED_EP_NUM = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
INCOMPATIBLE_EP = [[], [9], [10], [11], [8, 12], [13], [14], [15], [4], [1], [2], [3], [4], [5], [6], [7]]
timeout_ms_poll = 1
[docs]
def reinit(self, keep_ep0:bool = False):
if keep_ep0 and 0 in self.endpoints_mapping:
old_control_ep = self.endpoints_mapping[0]
self.endpoints_mapping = {0: self.endpoints_mapping[0]}
self.reverse_endpoints_mapping = {old_control_ep:0}
else:
self.endpoints_mapping = {} # emulated endpoint -> control board endpoint
self.reverse_endpoints_mapping = {} # control_board_endpoint -> emulated_endpoint
self.events = array('B', [0] * 2 * self.EVENT_QUEUE_SIZE)
# True when SET_CONFIGURATION has been received and the Hydradancer boards are configured
self.configured = False
# 0x00ff (IN status mask, 1 = emulated ep ready for priming), 0xff00 (OUT mask, data received on emulated ep)
self._hydradancer_status_bytes = array('B', [0] * 4)
self.hydradancer_status = {}
self.hydradancer_status["ep_in_status"] = (1 << 0) & 0xff
self.hydradancer_status["ep_out_status"] = 0x00
self.hydradancer_status["ep_in_nak"] = 0x00
[docs]
def __init__(self):
"""
Get handles on the USB control board, and wait for Hydradancer to be ready
"""
self.configured = False
self.endpoints_mapping : Dict[int,int] = {}
self.reinit()
# Open a connection to the target device...
self.device = usb.core.find(idVendor=0x16c0, idProduct=0x27d8)
if self.device is None:
raise HydradancerBoardFatalError("Hydradancer board not found")
if self.device.speed != usb.util.SPEED_SUPER:
raise HydradancerBoardFatalError(
"Hydradancer not detected as USB3 Superspeed")
cfg = self.device.get_active_configuration()
intf = cfg[(0, 0)]
# Detach the device from any kernel driver
for intf in cfg:
if self.device.is_kernel_driver_active(intf.bInterfaceNumber):
try:
self.device.detach_kernel_driver(intf.bInterfaceNumber)
except usb.core.USBError as e:
sys.exit("Could not detach kernel driver from interface({0}): {1}".format(
intf.bInterfaceNumber, str(e)))
# store the different endpoints handles we need
self.ep_in = list(usb.util.find_descriptor(
intf,
find_all=True,
custom_match=lambda e:
usb.util.endpoint_direction(e.bEndpointAddress) ==
usb.util.ENDPOINT_IN and usb.util.endpoint_address(e.bEndpointAddress) !=
self.EP_POLL_NUMBER))
self.ep_in = {usb.util.endpoint_address(
e.bEndpointAddress): e for e in self.ep_in}
self.ep_out = list(usb.util.find_descriptor(
intf,
find_all=True,
custom_match=lambda e:
usb.util.endpoint_direction(e.bEndpointAddress) ==
usb.util.ENDPOINT_OUT and usb.util.endpoint_address(e.bEndpointAddress) !=
self.EP_POLL_NUMBER))
self.ep_out = {usb.util.endpoint_address(
ep.bEndpointAddress): ep for ep in self.ep_out}
# the endpoint on which status information is received
self.ep_poll = usb.util.find_descriptor(
intf,
custom_match=lambda e:
usb.util.endpoint_direction(e.bEndpointAddress) ==
usb.util.ENDPOINT_IN and usb.util.endpoint_address(e.bEndpointAddress) ==
self.EP_POLL_NUMBER)
if len(self.ep_in.keys()) == 0 and len(self.ep_out.keys()) == 0:
logging.info("Dumping device configuration \r\n" + str(cfg))
raise HydradancerBoardFatalError(
"Could not fetch Hydradancer IN and OUT endpoints list")
if len(self.ep_in.keys()) == 0:
logging.info("Dumping device configuration \r\n" + str(cfg))
raise HydradancerBoardFatalError(
"Could not fetch Hydradancer IN endpoints list")
if len(self.ep_out.keys()) == 0:
logging.info("Dumping device configuration \r\n" + str(cfg))
raise HydradancerBoardFatalError(
"Could not fetch Hydradancer OUT endpoints list")
if self.ep_out.keys() != self.ep_in.keys():
logging.info("Dumping device configuration \r\n" + str(cfg))
raise HydradancerBoardFatalError(
f"Hydradancer IN/OUT endpoints pair incomplete \r\nep_in {self.ep_in} \r\nep_out {self.ep_out}")
if self.ep_poll is None:
logging.info("Dumping device configuration \r\n" + str(cfg))
raise HydradancerBoardFatalError(
f"Could not get handle on Hydradancer events endpoint (EP {self.EP_POLL_NUMBER})")
self.endpoints_pool = set(self.ep_in.keys())
# wait until the board is ready, for instance if a disconnect was previously issued
self.wait_board_ready()
[docs]
def connect(self):
"""
Enable the USB2 connection on the emulation board
"""
try:
self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT, self.ENABLE_USB_CONNECTION)
except (usb.core.USBTimeoutError, usb.core.USBError) as exception:
logging.error(exception)
raise HydradancerBoardFatalError("Error, unable to connect") from exception
[docs]
def disconnect(self):
"""
Disable the USB2 connection on the emulation board,
and reset internal states on both control and emulation boards.
"""
try:
self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT, self.DISABLE_USB)
usb.util.dispose_resources(self.device)
except (usb.core.USBTimeoutError, usb.core.USBError) as exception:
logging.error(exception)
raise HydradancerBoardFatalError("Error, unable to disconnect") from exception
[docs]
def wait_board_ready(self):
"""
Wait until the Hydradancer boards are ready, try to disconnect at some point to reset the internal states,
hoping it will be ready next time.
"""
# num of checks before trying to disconnect
max_num_status_ready_before_disconnect = 100
count_status_ready = 0
max_disconnect = 2
count_disconnect = 2
time_after_disconnect_sec = 1
time_between_checks_sec = 0.01
try:
# check if the board is ready a first time
hydradancer_ready = self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_IN, self.CHECK_HYDRADANCER_READY,
data_or_wLength=1, timeout=5)
# repeat max_num_status_ready_before_disconnect times
while (hydradancer_ready is None or hydradancer_ready == 0) and count_disconnect < max_disconnect:
count_status_ready += 1
if count_status_ready % max_num_status_ready_before_disconnect == 0 and \
count_disconnect < max_disconnect:
logging.info(
"This is taking too long, disconnecting again ...")
self.disconnect()
time.sleep(time_after_disconnect_sec)
count_disconnect += 1
hydradancer_ready = self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_IN, self.CHECK_HYDRADANCER_READY,
data_or_wLength=1, timeout=5)
time.sleep(time_between_checks_sec)
# if hydradancer is still not ready
if hydradancer_ready == 0:
raise HydradancerBoardFatalError(
"Hydradancer is not ready, please reset the board")
except (usb.core.USBTimeoutError, usb.core.USBError) as exception:
logging.error(exception)
raise HydradancerBoardFatalError(
"USB Error while waiting for Hydradancer go-ahead") from exception
[docs]
def set_endpoint_mapping(self, ep_num):
"""
Maps emulated endpoints (endpoints facing the target) to Facedancer's host endpoints (control board endpoints)
"""
if ep_num not in self.SUPPORTED_EP_NUM:
raise HydradancerBoardFatalError(
f"Endpoint number {ep_num} not supported, supported numbers : {self.SUPPORTED_EP_NUM}")
if len(self.endpoints_mapping.values()) >= len(self.endpoints_pool):
raise HydradancerBoardFatalError(
f"All {len(self.endpoints_pool)} endpoints are already in use (for EP0 included)")
if ep_num not in self.endpoints_mapping:
self.endpoints_mapping[ep_num] = list(
self.endpoints_pool - set(self.endpoints_mapping.values()))[0]
self.reverse_endpoints_mapping[self.endpoints_mapping[ep_num]] = ep_num
try:
self.device.ctrl_transfer(CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT,
self.SET_ENDPOINT_MAPPING, wValue=(
ep_num & 0x00ff) | ((self.endpoints_mapping[ep_num] << 8) & 0xff00))
except (usb.core.USBTimeoutError, usb.core.USBError) as exception:
logging.error(exception)
raise HydradancerBoardFatalError(
f"Could not set mapping for ep {ep_num}") from exception
[docs]
def set_usb2_speed(self, device_speed: DeviceSpeed=DeviceSpeed.FULL):
"""
Set the speed of the USB2 device. Speed is physically determined by the host,
so the emulation board must be configured.
"""
try:
self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT, self.SET_SPEED, wValue=self.facedancer_to_hydradancer_speed[device_speed] & 0x00ff)
except (usb.core.USBTimeoutError, usb.core.USBError) as exception:
logging.error(exception)
raise HydradancerBoardFatalError("Error, unable to set speed") from exception
[docs]
def clear_halt(self, endpoint_number:int, direction: USBDirection):
try:
self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT, self.CLEAR_HALT, wValue=((endpoint_number & 0xff) | ((direction & 0xff) << 8)))
except (usb.core.USBTimeoutError, usb.core.USBError) as exception:
logging.error(exception)
raise HydradancerBoardFatalError("Error, unable to clear halt on endpoint {endpoint_number} direction {direction}") from exception
[docs]
def set_address(self, address, defer=False):
"""
Set the USB address on the emulation board
"""
try:
self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT, self.SET_ADDRESS, address)
except (usb.core.USBTimeoutError, usb.core.USBError) as exception:
logging.error(exception)
raise HydradancerBoardFatalError(
"Error, unable to set address on emulated device") from exception
[docs]
def stall_endpoint(self, ep_num, direction=0):
"""
Stall the ep_num endpoint on the emulation board.
STALL will be cleared automatically after next SETUP packet received.
"""
# Stall EP
try:
if ep_num == 0:
self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT, self.SET_EP_RESPONSE, wValue=(ep_num | 0 << 7) | (self.ENDP_STATE_STALL << 8) & 0xff00)
self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT, self.SET_EP_RESPONSE, wValue=(ep_num | 1 << 7) | (self.ENDP_STATE_STALL << 8) & 0xff00)
else:
self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_OUT, self.SET_EP_RESPONSE, wValue=(ep_num | direction << 7) | (self.ENDP_STATE_STALL << 8) & 0xff00)
except (usb.core.USBTimeoutError, usb.core.USBError) as exception:
logging.error(exception)
raise HydradancerBoardFatalError(f"Could not stall ep {ep_num}") from exception
[docs]
def send(self, ep_num, data):
"""
Prime target endpoint ep_num.
"""
try:
while not self.in_buffer_empty(ep_num):
events = self.fetch_events()
logging.debug(f"Sending len {len(data)} {data} on ep {ep_num}")
self.ep_out[self.endpoints_mapping[ep_num]].write(
data)
self.hydradancer_status["ep_in_status"] &= ~(0x01 << ep_num)
self.hydradancer_status["ep_in_nak"] &= ~(0x01 << ep_num)
except (usb.core.USBTimeoutError, usb.core.USBError):
logging.error(f"could not send data on ep {ep_num}")
[docs]
def read(self, ep_num, blocking=False):
"""
Read from target endpoint ep_num. If blocking=True, wait until the endpoint's buffer is full.
"""
logging.debug(f"reading from ep {ep_num}")
try:
if blocking:
while not self.out_buffer_available(ep_num):
self.fetch_events()
if self.out_buffer_available(ep_num):
read = self.ep_in[self.endpoints_mapping[ep_num]].read(
self.MAX_PACKET_SIZE)
logging.debug(
f"EP{ep_num}/OUT: <- size {len(read)} {bytes(read)}")
self.hydradancer_status["ep_out_status"] &= ~(0x01 << ep_num)
return read
return None
except (usb.core.USBTimeoutError, usb.core.USBError):
logging.error(f"could not read data from ep {ep_num}")
return None
[docs]
def fetch_events(self):
"""
Poll the status of the endpoints. The state are accumulated (like on the boards),
and cleared when sending or reading data (which will trigger a similar clear on the boards).
Thus, self.ep_status should always be in sync with the endpoint's status on the boards.
"""
try:
# Use the endpoint type that best fits the type of request :
# -> for control requests, polling using ctrl transfers garanties the fastest status update.
# Latency is key in the enumeration phase
# -> for bulk requests, polling using bulk transfers allows for more status updates to be sent,
# thus increasing the speed
# TODO : what about interrupt or isochronous transfers ?
if not self.configured:
read = self.device.ctrl_transfer(
CTRL_TYPE_VENDOR | CTRL_RECIPIENT_DEVICE | CTRL_IN, self.GET_EVENT, data_or_wLength=self.events, timeout=self.timeout_ms_poll)
else:
read = self.ep_poll.read(
self.events, timeout=self.timeout_ms_poll*10)
if read >= 2:
events = []
for i in range(0, read, 2):
event = HydradancerEvent.from_bytes(self.events[i:i+2])
events.append(event)
logging.debug(event)
if event.event_type == HydradancerEvent.EVENT_IN_BUFFER_AVAILABLE:
self.hydradancer_status["ep_in_status"] |= (0x1 << event.value) & 0xff
elif event.event_type == HydradancerEvent.EVENT_OUT_BUFFER_AVAILABLE:
self.hydradancer_status["ep_out_status"] |= (0x1 << event.value) & 0xff
elif event.event_type == HydradancerEvent.EVENT_NAK:
self.hydradancer_status["ep_in_nak"] |= (0x1 << event.value) & 0xff
logging.debug(f"Hydradancer status {self.hydradancer_status}")
return events
return None
except usb.core.USBTimeoutError:
return None
except usb.core.USBError as exception:
logging.error(exception)
raise HydradancerBoardFatalError("USB Error while fetching events") from exception
[docs]
def in_buffer_empty(self, ep_num):
"""
Returns True if the IN buffer for target endpoint ep_num is ready for priming
"""
return self.hydradancer_status["ep_in_status"] & (0x1 << ep_num)
[docs]
def nak_on_endpoint(self, ep_num):
"""
Returns True if the IN Endpoint has sent a NAK (meaning a host has sent an IN request)
"""
return self.hydradancer_status["ep_in_nak"] & (0x1 << ep_num)
[docs]
def out_buffer_available(self, ep_num):
"""
Returns True if the OUT buffer for target endpoint ep_num is full
"""
return self.hydradancer_status["ep_out_status"] & (0x1 << ep_num)
[docs]
def control_buffer_available(self):
"""
Returns True if the control buffer is available. Since this buffer is shared between EP0 IN/EP0 OUT, only the OUT status is used for both.
"""
return self.out_buffer_available(0)