# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common code for ADB and Fastboot.
Common usb browsing and usb communication.
.. rubric:: Contents
* :func:`GetInterface`
* :func:`InterfaceMatcher`
* :class:`TcpHandle`
* :meth:`TcpHandle._connect`
* :meth:`TcpHandle.BulkRead`
* :meth:`TcpHandle.BulkWrite`
* :meth:`TcpHandle.Close`
* :meth:`TcpHandle.serial_number`
* :meth:`TcpHandle.Timeout`
* :meth:`TcpHandle.TimeoutSeconds`
* :class:`UsbHandle`
* :meth:`UsbHandle.BulkRead`
* :meth:`UsbHandle.BulkReadAsync`
* :meth:`UsbHandle.BulkWrite`
* :meth:`UsbHandle.Close`
* :meth:`UsbHandle.Find`
* :meth:`UsbHandle.FindAndOpen`
* :meth:`UsbHandle.FindDevices`
* :meth:`UsbHandle.FindFirst`
* :meth:`UsbHandle.FlushBuffers`
* :meth:`UsbHandle.Open`
* :meth:`UsbHandle.port_path`
* :meth:`UsbHandle.PortPathMatcher`
* :meth:`UsbHandle.serial_number`
* :meth:`UsbHandle.SerialMatcher`
* :meth:`UsbHandle.Timeout`
* :meth:`UsbHandle.usb_info`
"""
import logging
import platform
import re
import select
import socket
import threading
import weakref
import libusb1
import usb1
try:
from libusb1 import LIBUSB_ERROR_NOT_FOUND, LIBUSB_ERROR_TIMEOUT # pylint: disable=ungrouped-imports
except ImportError: # pragma: no cover
LIBUSB_ERROR_NOT_FOUND = 'LIBUSB_ERROR_NOT_FOUND'
LIBUSB_ERROR_TIMEOUT = 'LIBUSB_ERROR_TIMEOUT'
from adb import usb_exceptions
#: Default timeout
DEFAULT_TIMEOUT_MS = 10000
SYSFS_PORT_SPLIT_RE = re.compile("[,/:.-]")
_LOG = logging.getLogger('android_usb')
[docs]def GetInterface(setting):
"""Get the class, subclass, and protocol for the given USB setting.
.. image:: _static/adb.common.GetInterface.CALLER_GRAPH.svg
Parameters
----------
setting : TODO
TODO
Returns
-------
TODO
TODO
TODO
TODO
TODO
TODO
"""
return (setting.getClass(), setting.getSubClass(), setting.getProtocol())
[docs]def InterfaceMatcher(clazz, subclass, protocol):
"""Returns a matcher that returns the setting with the given interface.
.. image:: _static/adb.common.InterfaceMatcher.CALL_GRAPH.svg
Parameters
----------
clazz : TODO
TODO
subclass : TODO
TODO
protocol : TODO
TODO
Returns
-------
Matcher : function
TODO
"""
interface = (clazz, subclass, protocol)
def Matcher(device):
for setting in device.iterSettings():
if GetInterface(setting) == interface:
return setting
return None
return Matcher
[docs]class UsbHandle(object):
"""USB communication object. Not thread-safe.
Handles reading and writing over USB with the proper endpoints, exceptions,
and interface claiming.
Important methods:
* `UsbHandle.FlushBuffers`
* `UsbHandle.BulkRead`
* `UsbHandle.BulkWrite(bytes data)`
.. image:: _static/adb.common.UsbHandle.__init__.CALLER_GRAPH.svg
Parameters
----------
device : TODO
libusb_device to connect to.
setting : TODO
libusb setting with the correct endpoints to communicate with.
usb_info : TODO, None
String describing the usb path/serial/device, for debugging.
timeout_ms : TODO, None
Timeout in milliseconds for all I/O.
Attributes
----------
_device : TODO
libusb_device to connect to.
_handle : TODO
TODO
_interface_number : TODO
TODO
_max_read_packet_len : TODO
TODO
_read_endpoint : TODO
TODO
_setting : TODO
libusb setting with the correct endpoints to communicate with.
_timeout_ms : TODO, None
Timeout in milliseconds for all I/O.
_usb_info : TODO
String describing the usb path/serial/device, for debugging.
_write_endpoint : TODO, None
TODO
"""
_HANDLE_CACHE = weakref.WeakValueDictionary()
_HANDLE_CACHE_LOCK = threading.Lock()
def __init__(self, device, setting, usb_info=None, timeout_ms=None):
"""Initialize USB Handle."""
self._setting = setting
self._device = device
self._handle = None
self._interface_number = None
self._read_endpoint = None
self._write_endpoint = None
self._usb_info = usb_info or ''
self._timeout_ms = timeout_ms if timeout_ms else DEFAULT_TIMEOUT_MS
self._max_read_packet_len = 0
@property
def usb_info(self):
"""TODO
.. image:: _static/adb.common.UsbHandle.usb_info.CALL_GRAPH.svg
.. image:: _static/adb.common.UsbHandle.usb_info.CALLER_GRAPH.svg
Returns
-------
TODO
TODO
"""
try:
sn = self.serial_number
except libusb1.USBError:
sn = ''
if sn and sn != self._usb_info:
return '%s %s' % (self._usb_info, sn)
return self._usb_info
[docs] def Open(self):
"""Opens the USB device for this setting, and claims the interface.
.. image:: _static/adb.common.UsbHandle.Open.CALL_GRAPH.svg
"""
# Make sure we close any previous handle open to this usb device.
port_path = tuple(self.port_path)
with self._HANDLE_CACHE_LOCK:
old_handle = self._HANDLE_CACHE.get(port_path)
if old_handle is not None:
old_handle.Close()
self._read_endpoint = None
self._write_endpoint = None
for endpoint in self._setting.iterEndpoints():
address = endpoint.getAddress()
if address & libusb1.USB_ENDPOINT_DIR_MASK:
self._read_endpoint = address
self._max_read_packet_len = endpoint.getMaxPacketSize()
else:
self._write_endpoint = address
assert self._read_endpoint is not None
assert self._write_endpoint is not None
handle = self._device.open()
iface_number = self._setting.getNumber()
try:
if (platform.system() != 'Windows' and handle.kernelDriverActive(iface_number)):
handle.detachKernelDriver(iface_number)
except libusb1.USBError as e:
if e.value == LIBUSB_ERROR_NOT_FOUND:
_LOG.warning('Kernel driver not found for interface: %s.', iface_number)
else:
raise
handle.claimInterface(iface_number)
self._handle = handle
self._interface_number = iface_number
with self._HANDLE_CACHE_LOCK:
self._HANDLE_CACHE[port_path] = self
# When this object is deleted, make sure it's closed.
weakref.ref(self, self.Close)
@property
def serial_number(self):
"""TODO
.. image:: _static/adb.common.UsbHandle.serial_number.CALLER_GRAPH.svg
Returns
-------
TODO
TODO
"""
return self._device.getSerialNumber()
@property
def port_path(self):
"""TODO
.. image:: _static/adb.common.UsbHandle.port_path.CALLER_GRAPH.svg
Returns
-------
TODO
TODO
"""
return [self._device.getBusNumber()] + self._device.getPortNumberList()
[docs] def Close(self):
"""TODO
.. image:: _static/adb.common.UsbHandle.Close.CALL_GRAPH.svg
.. image:: _static/adb.common.UsbHandle.Close.CALLER_GRAPH.svg
"""
if self._handle is None:
return
try:
self._handle.releaseInterface(self._interface_number)
self._handle.close()
except libusb1.USBError:
_LOG.info('USBError while closing handle %s: ',
self.usb_info, exc_info=True)
finally:
self._handle = None
[docs] def Timeout(self, timeout_ms):
"""TODO
.. image:: _static/adb.common.UsbHandle.Timeout.CALLER_GRAPH.svg
Returns
-------
TODO
TODO
"""
return timeout_ms if timeout_ms is not None else self._timeout_ms
[docs] def FlushBuffers(self):
"""TODO
.. image:: _static/adb.common.UsbHandle.FlushBuffers.CALL_GRAPH.svg
Raises
------
adb.usb_exceptions.ReadFailedError
TODO
"""
while True:
try:
self.BulkRead(self._max_read_packet_len, timeout_ms=10)
except usb_exceptions.ReadFailedError as e:
if e.usb_error.value == LIBUSB_ERROR_TIMEOUT:
break
raise
[docs] def BulkWrite(self, data, timeout_ms=None):
"""TODO
.. image:: _static/adb.common.UsbHandle.BulkWrite.CALL_GRAPH.svg
Parameters
----------
data : bytes
TODO
timeout_ms : TODO, None
TODO
Returns
-------
TODO
TODO
Raises
------
adb.usb_exceptions.WriteFailedError
This handle has been closed, probably due to another being opened
adb.usb_exceptions.WriteFailedError
Could not send data
"""
if self._handle is None:
raise usb_exceptions.WriteFailedError('This handle has been closed, probably due to another being opened.', None)
try:
return self._handle.bulkWrite(self._write_endpoint, data, timeout=self.Timeout(timeout_ms))
except libusb1.USBError as e:
raise usb_exceptions.WriteFailedError('Could not send data to %s (timeout %sms)' % (self.usb_info, self.Timeout(timeout_ms)), e)
[docs] def BulkRead(self, length, timeout_ms=None):
"""TODO
.. image:: _static/adb.common.UsbHandle.BulkRead.CALL_GRAPH.svg
.. image:: _static/adb.common.UsbHandle.BulkRead.CALLER_GRAPH.svg
Parameters
----------
length : int
TODO
timeout_ms : TODO, None
TODO
Returns
-------
bytearray
TODO
Raises
------
usb_exceptions.ReadFailedError
Could not receive data
"""
if self._handle is None:
raise usb_exceptions.ReadFailedError('This handle has been closed, probably due to another being opened.', None)
try:
# python-libusb1 > 1.6 exposes bytearray()s now instead of bytes/str.
# To support older and newer versions, we ensure everything's bytearray()
# from here on out.
return bytearray(self._handle.bulkRead(self._read_endpoint, length, timeout=self.Timeout(timeout_ms)))
except libusb1.USBError as e:
raise usb_exceptions.ReadFailedError('Could not receive data from %s (timeout %sms)' % (self.usb_info, self.Timeout(timeout_ms)), e)
[docs] def BulkReadAsync(self, length, timeout_ms=None):
"""TODO
Parameters
----------
length : int
TODO
timeout_ms : TODO, None
TODO
Raises
------
NotImplementedError
This is always raised because this method is not implemented.
"""
# See: https://pypi.python.org/pypi/libusb1 "Asynchronous I/O" section
raise NotImplementedError
[docs] @classmethod
def PortPathMatcher(cls, port_path):
"""Returns a device matcher for the given port path.
.. image:: _static/adb.common.UsbHandle.SerialMatcher.CALL_GRAPH.svg
.. image:: _static/adb.common.UsbHandle.PortPathMatcher.CALLER_GRAPH.svg
Parameters
----------
port_path : TODO
TODO
Returns
-------
function
TODO
"""
if isinstance(port_path, str):
# Convert from sysfs path to port_path.
port_path = [int(part) for part in SYSFS_PORT_SPLIT_RE.split(port_path)]
return lambda device: device.port_path == port_path
[docs] @classmethod
def SerialMatcher(cls, serial):
"""Returns a device matcher for the given serial.
.. image:: _static/adb.common.UsbHandle.SerialMatcher.CALLER_GRAPH.svg
Parameters
----------
serial : TODO
TODO
Returns
-------
function
TODO
"""
return lambda device: device.serial_number == serial
[docs] @classmethod
def FindAndOpen(cls, setting_matcher, port_path=None, serial=None, timeout_ms=None):
"""TODO
.. image:: _static/adb.common.UsbHandle.FindAndOpen.CALL_GRAPH.svg
.. image:: _static/adb.common.UsbHandle.FindAndOpen.CALLER_GRAPH.svg
Parameters
----------
setting_matcher : TODO
TODO
port_path : TODO, None
TODO
serial : TODO, None
TODO
timeout_ms : TODO, None
TODO
Returns
-------
dev : TODO
TODO
"""
dev = cls.Find(setting_matcher, port_path=port_path, serial=serial, timeout_ms=timeout_ms)
dev.Open()
dev.FlushBuffers()
return dev
[docs] @classmethod
def Find(cls, setting_matcher, port_path=None, serial=None, timeout_ms=None):
"""Gets the first device that matches according to the keyword args.
.. image:: _static/adb.common.UsbHandle.Find.CALL_GRAPH.svg
.. image:: _static/adb.common.UsbHandle.Find.CALLER_GRAPH.svg
Parameters
----------
setting_matcher : TODO
TODO
port_path : TODO, None
TODO
serial : TODO, None
TODO
timeout_ms : TODO, None
TODO
Returns
-------
TODO
TODO
"""
if port_path:
device_matcher = cls.PortPathMatcher(port_path)
usb_info = port_path
elif serial:
device_matcher = cls.SerialMatcher(serial)
usb_info = serial
else:
device_matcher = None
usb_info = 'first'
return cls.FindFirst(setting_matcher, device_matcher, usb_info=usb_info, timeout_ms=timeout_ms)
[docs] @classmethod
def FindFirst(cls, setting_matcher, device_matcher=None, **kwargs):
"""Find and return the first matching device.
.. image:: _static/adb.common.UsbHandle.FindFirst.CALL_GRAPH.svg
.. image:: _static/adb.common.UsbHandle.FindFirst.CALLER_GRAPH.svg
Parameters
----------
setting_matcher : TODO
See :meth:`UsbHandle.FindDevices`.
device_matcher : TODO
See :meth:`UsbHandle.FindDevices`.
**kwargs : TODO
See :meth:`UsbHandle.FindDevices`.
Returns
-------
TODO
An instance of UsbHandle.
Raises
------
adb.usb_exceptions.DeviceNotFoundError
Raised if the device is not available.
"""
try:
return next(cls.FindDevices(setting_matcher, device_matcher=device_matcher, **kwargs))
except StopIteration:
raise usb_exceptions.DeviceNotFoundError('No device available, or it is in the wrong configuration.')
[docs] @classmethod
def FindDevices(cls, setting_matcher, device_matcher=None,
usb_info='', timeout_ms=None):
"""Find and yield the devices that match.
.. image:: _static/adb.common.UsbHandle.FindDevices.CALLER_GRAPH.svg
Parameters
----------
setting_matcher : TODO
Function that returns the setting to use given a ``usb1.USBDevice``, or ``None``
if the device doesn't have a valid setting.
device_matcher : TODO, None
Function that returns ``True`` if the given ``UsbHandle`` is
valid. ``None`` to match any device.
usb_info : str
Info string describing device(s).
timeout_ms : TODO, None
Default timeout of commands in milliseconds.
Yields
------
TODO
UsbHandle instances
"""
ctx = usb1.USBContext()
for device in ctx.getDeviceList(skip_on_error=True):
setting = setting_matcher(device)
if setting is None:
continue
handle = cls(device, setting, usb_info=usb_info, timeout_ms=timeout_ms)
if device_matcher is None or device_matcher(handle):
yield handle
[docs]class TcpHandle(object):
"""TCP connection object.
Provides same interface as `UsbHandle`.
.. image:: _static/adb.common.TcpHandle.__init__.CALLER_GRAPH.svg
Parameters
----------
serial : str, bytes, bytearray
Android device serial of the form "host" or "host:port". (Host may be an IP address or a host name.)
timeout_ms : TODO, None
TODO
Attributes
----------
_connection : TODO, None
TODO
_serial_number : str
``<host>:<port>``
_timeout_ms : float, None
TODO
host : str, TODO
TODO
port : str, int, TODO
TODO
"""
def __init__(self, serial, timeout_ms=None):
# if necessary, convert serial to a unicode string
if isinstance(serial, (bytes, bytearray)):
serial = serial.decode('utf-8')
if ':' in serial:
self.host, self.port = serial.split(':')
else:
self.host = serial
self.port = 5555
self._connection = None
self._serial_number = '%s:%s' % (self.host, self.port)
self._timeout_ms = float(timeout_ms) if timeout_ms else None
self._connect()
[docs] def _connect(self):
"""TODO
.. image:: _static/adb.common.TcpHandle._connect.CALL_GRAPH.svg
"""
timeout = self.TimeoutSeconds(self._timeout_ms)
self._connection = socket.create_connection((self.host, self.port), timeout=timeout)
if timeout:
self._connection.setblocking(0)
@property
def serial_number(self):
"""TODO
.. image:: _static/adb.common.TcpHandle.serial_number.CALLER_GRAPH.svg
Returns
-------
self._serial_number : str
The ``_serial_number`` attribute (``<host>:<port>``)
"""
return self._serial_number
[docs] def BulkWrite(self, data, timeout=None):
"""TODO
.. image:: _static/adb.common.TcpHandle.BulkWrite.CALL_GRAPH.svg
Parameters
----------
data : TODO
TODO
timeout : TODO, None
TODO
Returns
-------
TODO
TODO
Raises
------
adb.usb_exceptions.TcpTimeoutException
Sending data timed out. No data was sent.
"""
t = self.TimeoutSeconds(timeout)
_, writeable, _ = select.select([], [self._connection], [], t)
if writeable:
return self._connection.send(data)
msg = 'Sending data to {} timed out after {}s. No data was sent.'.format(self.serial_number, t)
raise usb_exceptions.TcpTimeoutException(msg)
[docs] def BulkRead(self, numbytes, timeout=None):
"""TODO
.. image:: _static/adb.common.TcpHandle.BulkRead.CALL_GRAPH.svg
Parameters
----------
numbytes : int
TODO
timeout_ms : TODO, None
TODO
Returns
-------
TODO
TODO
Raises
------
adb.usb_exceptions.TcpTimeoutException
Reading timed out.
"""
t = self.TimeoutSeconds(timeout)
readable, _, _ = select.select([self._connection], [], [], t)
if readable:
return self._connection.recv(numbytes)
msg = 'Reading from {} timed out (Timeout {}s)'.format(self._serial_number, t)
raise usb_exceptions.TcpTimeoutException(msg)
[docs] def Timeout(self, timeout_ms):
"""TODO
.. image:: _static/adb.common.TcpHandle.Timeout.CALLER_GRAPH.svg
Parameters
----------
timeout_ms : TODO
TODO
Returns
-------
float
TODO
"""
return float(timeout_ms) if timeout_ms is not None else self._timeout_ms
[docs] def TimeoutSeconds(self, timeout_ms):
"""TODO
.. image:: _static/adb.common.TcpHandle.TimeoutSeconds.CALL_GRAPH.svg
.. image:: _static/adb.common.TcpHandle.TimeoutSeconds.CALLER_GRAPH.svg
Parameters
----------
timeout_ms : TODO
TODO
Returns
-------
float
TODO
"""
timeout = self.Timeout(timeout_ms)
return timeout / 1000.0 if timeout is not None else timeout
[docs] def Close(self):
"""TODO
Returns
-------
TODO
TODO
"""
return self._connection.close()