Source code for adb.common

# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Common code for ADB and Fastboot.

Common usb browsing and usb communication.


.. rubric:: Contents

* :func:`GetInterface`
* :func:`InterfaceMatcher`
* :class:`TcpHandle`

    * :meth:`TcpHandle._connect`
    * :meth:`TcpHandle.BulkRead`
    * :meth:`TcpHandle.BulkWrite`
    * :meth:`TcpHandle.Close`
    * :meth:`TcpHandle.serial_number`
    * :meth:`TcpHandle.Timeout`
    * :meth:`TcpHandle.TimeoutSeconds`

* :class:`UsbHandle`

    * :meth:`UsbHandle.BulkRead`
    * :meth:`UsbHandle.BulkReadAsync`
    * :meth:`UsbHandle.BulkWrite`
    * :meth:`UsbHandle.Close`
    * :meth:`UsbHandle.Find`
    * :meth:`UsbHandle.FindAndOpen`
    * :meth:`UsbHandle.FindDevices`
    * :meth:`UsbHandle.FindFirst`
    * :meth:`UsbHandle.FlushBuffers`
    * :meth:`UsbHandle.Open`
    * :meth:`UsbHandle.port_path`
    * :meth:`UsbHandle.PortPathMatcher`
    * :meth:`UsbHandle.serial_number`
    * :meth:`UsbHandle.SerialMatcher`
    * :meth:`UsbHandle.Timeout`
    * :meth:`UsbHandle.usb_info`

"""

import logging
import platform
import re
import select
import socket
import threading
import weakref

import libusb1
import usb1

try:
    from libusb1 import LIBUSB_ERROR_NOT_FOUND, LIBUSB_ERROR_TIMEOUT  # pylint: disable=ungrouped-imports
except ImportError:  # pragma: no cover
    LIBUSB_ERROR_NOT_FOUND = 'LIBUSB_ERROR_NOT_FOUND'
    LIBUSB_ERROR_TIMEOUT = 'LIBUSB_ERROR_TIMEOUT'

from adb import usb_exceptions


#: Default timeout
DEFAULT_TIMEOUT_MS = 10000

SYSFS_PORT_SPLIT_RE = re.compile("[,/:.-]")

_LOG = logging.getLogger('android_usb')


[docs]def GetInterface(setting): """Get the class, subclass, and protocol for the given USB setting. .. image:: _static/adb.common.GetInterface.CALLER_GRAPH.svg Parameters ---------- setting : TODO TODO Returns ------- TODO TODO TODO TODO TODO TODO """ return (setting.getClass(), setting.getSubClass(), setting.getProtocol())
[docs]def InterfaceMatcher(clazz, subclass, protocol): """Returns a matcher that returns the setting with the given interface. .. image:: _static/adb.common.InterfaceMatcher.CALL_GRAPH.svg Parameters ---------- clazz : TODO TODO subclass : TODO TODO protocol : TODO TODO Returns ------- Matcher : function TODO """ interface = (clazz, subclass, protocol) def Matcher(device): for setting in device.iterSettings(): if GetInterface(setting) == interface: return setting return None return Matcher
[docs]class UsbHandle(object): """USB communication object. Not thread-safe. Handles reading and writing over USB with the proper endpoints, exceptions, and interface claiming. Important methods: * `UsbHandle.FlushBuffers` * `UsbHandle.BulkRead` * `UsbHandle.BulkWrite(bytes data)` .. image:: _static/adb.common.UsbHandle.__init__.CALLER_GRAPH.svg Parameters ---------- device : TODO libusb_device to connect to. setting : TODO libusb setting with the correct endpoints to communicate with. usb_info : TODO, None String describing the usb path/serial/device, for debugging. timeout_ms : TODO, None Timeout in milliseconds for all I/O. Attributes ---------- _device : TODO libusb_device to connect to. _handle : TODO TODO _interface_number : TODO TODO _max_read_packet_len : TODO TODO _read_endpoint : TODO TODO _setting : TODO libusb setting with the correct endpoints to communicate with. _timeout_ms : TODO, None Timeout in milliseconds for all I/O. _usb_info : TODO String describing the usb path/serial/device, for debugging. _write_endpoint : TODO, None TODO """ _HANDLE_CACHE = weakref.WeakValueDictionary() _HANDLE_CACHE_LOCK = threading.Lock() def __init__(self, device, setting, usb_info=None, timeout_ms=None): """Initialize USB Handle.""" self._setting = setting self._device = device self._handle = None self._interface_number = None self._read_endpoint = None self._write_endpoint = None self._usb_info = usb_info or '' self._timeout_ms = timeout_ms if timeout_ms else DEFAULT_TIMEOUT_MS self._max_read_packet_len = 0 @property def usb_info(self): """TODO .. image:: _static/adb.common.UsbHandle.usb_info.CALL_GRAPH.svg .. image:: _static/adb.common.UsbHandle.usb_info.CALLER_GRAPH.svg Returns ------- TODO TODO """ try: sn = self.serial_number except libusb1.USBError: sn = '' if sn and sn != self._usb_info: return '%s %s' % (self._usb_info, sn) return self._usb_info
[docs] def Open(self): """Opens the USB device for this setting, and claims the interface. .. image:: _static/adb.common.UsbHandle.Open.CALL_GRAPH.svg """ # Make sure we close any previous handle open to this usb device. port_path = tuple(self.port_path) with self._HANDLE_CACHE_LOCK: old_handle = self._HANDLE_CACHE.get(port_path) if old_handle is not None: old_handle.Close() self._read_endpoint = None self._write_endpoint = None for endpoint in self._setting.iterEndpoints(): address = endpoint.getAddress() if address & libusb1.USB_ENDPOINT_DIR_MASK: self._read_endpoint = address self._max_read_packet_len = endpoint.getMaxPacketSize() else: self._write_endpoint = address assert self._read_endpoint is not None assert self._write_endpoint is not None handle = self._device.open() iface_number = self._setting.getNumber() try: if (platform.system() != 'Windows' and handle.kernelDriverActive(iface_number)): handle.detachKernelDriver(iface_number) except libusb1.USBError as e: if e.value == LIBUSB_ERROR_NOT_FOUND: _LOG.warning('Kernel driver not found for interface: %s.', iface_number) else: raise handle.claimInterface(iface_number) self._handle = handle self._interface_number = iface_number with self._HANDLE_CACHE_LOCK: self._HANDLE_CACHE[port_path] = self # When this object is deleted, make sure it's closed. weakref.ref(self, self.Close)
@property def serial_number(self): """TODO .. image:: _static/adb.common.UsbHandle.serial_number.CALLER_GRAPH.svg Returns ------- TODO TODO """ return self._device.getSerialNumber() @property def port_path(self): """TODO .. image:: _static/adb.common.UsbHandle.port_path.CALLER_GRAPH.svg Returns ------- TODO TODO """ return [self._device.getBusNumber()] + self._device.getPortNumberList()
[docs] def Close(self): """TODO .. image:: _static/adb.common.UsbHandle.Close.CALL_GRAPH.svg .. image:: _static/adb.common.UsbHandle.Close.CALLER_GRAPH.svg """ if self._handle is None: return try: self._handle.releaseInterface(self._interface_number) self._handle.close() except libusb1.USBError: _LOG.info('USBError while closing handle %s: ', self.usb_info, exc_info=True) finally: self._handle = None
[docs] def Timeout(self, timeout_ms): """TODO .. image:: _static/adb.common.UsbHandle.Timeout.CALLER_GRAPH.svg Returns ------- TODO TODO """ return timeout_ms if timeout_ms is not None else self._timeout_ms
[docs] def FlushBuffers(self): """TODO .. image:: _static/adb.common.UsbHandle.FlushBuffers.CALL_GRAPH.svg Raises ------ adb.usb_exceptions.ReadFailedError TODO """ while True: try: self.BulkRead(self._max_read_packet_len, timeout_ms=10) except usb_exceptions.ReadFailedError as e: if e.usb_error.value == LIBUSB_ERROR_TIMEOUT: break raise
[docs] def BulkWrite(self, data, timeout_ms=None): """TODO .. image:: _static/adb.common.UsbHandle.BulkWrite.CALL_GRAPH.svg Parameters ---------- data : bytes TODO timeout_ms : TODO, None TODO Returns ------- TODO TODO Raises ------ adb.usb_exceptions.WriteFailedError This handle has been closed, probably due to another being opened adb.usb_exceptions.WriteFailedError Could not send data """ if self._handle is None: raise usb_exceptions.WriteFailedError('This handle has been closed, probably due to another being opened.', None) try: return self._handle.bulkWrite(self._write_endpoint, data, timeout=self.Timeout(timeout_ms)) except libusb1.USBError as e: raise usb_exceptions.WriteFailedError('Could not send data to %s (timeout %sms)' % (self.usb_info, self.Timeout(timeout_ms)), e)
[docs] def BulkRead(self, length, timeout_ms=None): """TODO .. image:: _static/adb.common.UsbHandle.BulkRead.CALL_GRAPH.svg .. image:: _static/adb.common.UsbHandle.BulkRead.CALLER_GRAPH.svg Parameters ---------- length : int TODO timeout_ms : TODO, None TODO Returns ------- bytearray TODO Raises ------ usb_exceptions.ReadFailedError Could not receive data """ if self._handle is None: raise usb_exceptions.ReadFailedError('This handle has been closed, probably due to another being opened.', None) try: # python-libusb1 > 1.6 exposes bytearray()s now instead of bytes/str. # To support older and newer versions, we ensure everything's bytearray() # from here on out. return bytearray(self._handle.bulkRead(self._read_endpoint, length, timeout=self.Timeout(timeout_ms))) except libusb1.USBError as e: raise usb_exceptions.ReadFailedError('Could not receive data from %s (timeout %sms)' % (self.usb_info, self.Timeout(timeout_ms)), e)
[docs] def BulkReadAsync(self, length, timeout_ms=None): """TODO Parameters ---------- length : int TODO timeout_ms : TODO, None TODO Raises ------ NotImplementedError This is always raised because this method is not implemented. """ # See: https://pypi.python.org/pypi/libusb1 "Asynchronous I/O" section raise NotImplementedError
[docs] @classmethod def PortPathMatcher(cls, port_path): """Returns a device matcher for the given port path. .. image:: _static/adb.common.UsbHandle.SerialMatcher.CALL_GRAPH.svg .. image:: _static/adb.common.UsbHandle.PortPathMatcher.CALLER_GRAPH.svg Parameters ---------- port_path : TODO TODO Returns ------- function TODO """ if isinstance(port_path, str): # Convert from sysfs path to port_path. port_path = [int(part) for part in SYSFS_PORT_SPLIT_RE.split(port_path)] return lambda device: device.port_path == port_path
[docs] @classmethod def SerialMatcher(cls, serial): """Returns a device matcher for the given serial. .. image:: _static/adb.common.UsbHandle.SerialMatcher.CALLER_GRAPH.svg Parameters ---------- serial : TODO TODO Returns ------- function TODO """ return lambda device: device.serial_number == serial
[docs] @classmethod def FindAndOpen(cls, setting_matcher, port_path=None, serial=None, timeout_ms=None): """TODO .. image:: _static/adb.common.UsbHandle.FindAndOpen.CALL_GRAPH.svg .. image:: _static/adb.common.UsbHandle.FindAndOpen.CALLER_GRAPH.svg Parameters ---------- setting_matcher : TODO TODO port_path : TODO, None TODO serial : TODO, None TODO timeout_ms : TODO, None TODO Returns ------- dev : TODO TODO """ dev = cls.Find(setting_matcher, port_path=port_path, serial=serial, timeout_ms=timeout_ms) dev.Open() dev.FlushBuffers() return dev
[docs] @classmethod def Find(cls, setting_matcher, port_path=None, serial=None, timeout_ms=None): """Gets the first device that matches according to the keyword args. .. image:: _static/adb.common.UsbHandle.Find.CALL_GRAPH.svg .. image:: _static/adb.common.UsbHandle.Find.CALLER_GRAPH.svg Parameters ---------- setting_matcher : TODO TODO port_path : TODO, None TODO serial : TODO, None TODO timeout_ms : TODO, None TODO Returns ------- TODO TODO """ if port_path: device_matcher = cls.PortPathMatcher(port_path) usb_info = port_path elif serial: device_matcher = cls.SerialMatcher(serial) usb_info = serial else: device_matcher = None usb_info = 'first' return cls.FindFirst(setting_matcher, device_matcher, usb_info=usb_info, timeout_ms=timeout_ms)
[docs] @classmethod def FindFirst(cls, setting_matcher, device_matcher=None, **kwargs): """Find and return the first matching device. .. image:: _static/adb.common.UsbHandle.FindFirst.CALL_GRAPH.svg .. image:: _static/adb.common.UsbHandle.FindFirst.CALLER_GRAPH.svg Parameters ---------- setting_matcher : TODO See :meth:`UsbHandle.FindDevices`. device_matcher : TODO See :meth:`UsbHandle.FindDevices`. **kwargs : TODO See :meth:`UsbHandle.FindDevices`. Returns ------- TODO An instance of UsbHandle. Raises ------ adb.usb_exceptions.DeviceNotFoundError Raised if the device is not available. """ try: return next(cls.FindDevices(setting_matcher, device_matcher=device_matcher, **kwargs)) except StopIteration: raise usb_exceptions.DeviceNotFoundError('No device available, or it is in the wrong configuration.')
[docs] @classmethod def FindDevices(cls, setting_matcher, device_matcher=None, usb_info='', timeout_ms=None): """Find and yield the devices that match. .. image:: _static/adb.common.UsbHandle.FindDevices.CALLER_GRAPH.svg Parameters ---------- setting_matcher : TODO Function that returns the setting to use given a ``usb1.USBDevice``, or ``None`` if the device doesn't have a valid setting. device_matcher : TODO, None Function that returns ``True`` if the given ``UsbHandle`` is valid. ``None`` to match any device. usb_info : str Info string describing device(s). timeout_ms : TODO, None Default timeout of commands in milliseconds. Yields ------ TODO UsbHandle instances """ ctx = usb1.USBContext() for device in ctx.getDeviceList(skip_on_error=True): setting = setting_matcher(device) if setting is None: continue handle = cls(device, setting, usb_info=usb_info, timeout_ms=timeout_ms) if device_matcher is None or device_matcher(handle): yield handle
[docs]class TcpHandle(object): """TCP connection object. Provides same interface as `UsbHandle`. .. image:: _static/adb.common.TcpHandle.__init__.CALLER_GRAPH.svg Parameters ---------- serial : str, bytes, bytearray Android device serial of the form "host" or "host:port". (Host may be an IP address or a host name.) timeout_ms : TODO, None TODO Attributes ---------- _connection : TODO, None TODO _serial_number : str ``<host>:<port>`` _timeout_ms : float, None TODO host : str, TODO TODO port : str, int, TODO TODO """ def __init__(self, serial, timeout_ms=None): # if necessary, convert serial to a unicode string if isinstance(serial, (bytes, bytearray)): serial = serial.decode('utf-8') if ':' in serial: self.host, self.port = serial.split(':') else: self.host = serial self.port = 5555 self._connection = None self._serial_number = '%s:%s' % (self.host, self.port) self._timeout_ms = float(timeout_ms) if timeout_ms else None self._connect()
[docs] def _connect(self): """TODO .. image:: _static/adb.common.TcpHandle._connect.CALL_GRAPH.svg """ timeout = self.TimeoutSeconds(self._timeout_ms) self._connection = socket.create_connection((self.host, self.port), timeout=timeout) if timeout: self._connection.setblocking(0)
@property def serial_number(self): """TODO .. image:: _static/adb.common.TcpHandle.serial_number.CALLER_GRAPH.svg Returns ------- self._serial_number : str The ``_serial_number`` attribute (``<host>:<port>``) """ return self._serial_number
[docs] def BulkWrite(self, data, timeout=None): """TODO .. image:: _static/adb.common.TcpHandle.BulkWrite.CALL_GRAPH.svg Parameters ---------- data : TODO TODO timeout : TODO, None TODO Returns ------- TODO TODO Raises ------ adb.usb_exceptions.TcpTimeoutException Sending data timed out. No data was sent. """ t = self.TimeoutSeconds(timeout) _, writeable, _ = select.select([], [self._connection], [], t) if writeable: return self._connection.send(data) msg = 'Sending data to {} timed out after {}s. No data was sent.'.format(self.serial_number, t) raise usb_exceptions.TcpTimeoutException(msg)
[docs] def BulkRead(self, numbytes, timeout=None): """TODO .. image:: _static/adb.common.TcpHandle.BulkRead.CALL_GRAPH.svg Parameters ---------- numbytes : int TODO timeout_ms : TODO, None TODO Returns ------- TODO TODO Raises ------ adb.usb_exceptions.TcpTimeoutException Reading timed out. """ t = self.TimeoutSeconds(timeout) readable, _, _ = select.select([self._connection], [], [], t) if readable: return self._connection.recv(numbytes) msg = 'Reading from {} timed out (Timeout {}s)'.format(self._serial_number, t) raise usb_exceptions.TcpTimeoutException(msg)
[docs] def Timeout(self, timeout_ms): """TODO .. image:: _static/adb.common.TcpHandle.Timeout.CALLER_GRAPH.svg Parameters ---------- timeout_ms : TODO TODO Returns ------- float TODO """ return float(timeout_ms) if timeout_ms is not None else self._timeout_ms
[docs] def TimeoutSeconds(self, timeout_ms): """TODO .. image:: _static/adb.common.TcpHandle.TimeoutSeconds.CALL_GRAPH.svg .. image:: _static/adb.common.TcpHandle.TimeoutSeconds.CALLER_GRAPH.svg Parameters ---------- timeout_ms : TODO TODO Returns ------- float TODO """ timeout = self.Timeout(timeout_ms) return timeout / 1000.0 if timeout is not None else timeout
[docs] def Close(self): """TODO Returns ------- TODO TODO """ return self._connection.close()