# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ADB protocol implementation.
Implements the ADB protocol as seen in android's adb/adbd binaries, but only the
host side.
.. rubric:: Contents
* :class:`_AdbConnection`
* :meth:`_AdbConnection._Send`
* :meth:`_AdbConnection.Close`
* :meth:`_AdbConnection.Okay`
* :meth:`_AdbConnection.ReadUntil`
* :meth:`_AdbConnection.ReadUntilClose`
* :meth:`_AdbConnection.Write`
* :class:`AdbMessage`
* :meth:`AdbMessage.CalculateChecksum`
* :meth:`AdbMessage.checksum`
* :meth:`AdbMessage.Command`
* :meth:`AdbMessage.Connect`
* :meth:`AdbMessage.InteractiveShellCommand`
* :meth:`AdbMessage.Open`
* :meth:`AdbMessage.Pack`
* :meth:`AdbMessage.Read`
* :meth:`AdbMessage.Send`
* :meth:`AdbMessage.StreamingCommand`
* :meth:`AdbMessage.Unpack`
* :class:`AuthSigner`
* :meth:`AuthSigner.GetPublicKey`
* :meth:`AuthSigner.Sign`
* :func:`find_backspace_runs`
* :class:`InterleavedDataError`
* :class:`InvalidChecksumError`
* :class:`InvalidCommandError`
* :class:`InvalidResponseError`
* :func:`MakeWireIDs`
"""
import struct
import time
from io import BytesIO
from adb import usb_exceptions
#: Maximum amount of data in an ADB packet.
MAX_ADB_DATA = 4096
#: ADB protocol version.
VERSION = 0x01000000
#: AUTH constants for arg0.
AUTH_TOKEN = 1
#: AUTH constants for arg0.
AUTH_SIGNATURE = 2
#: AUTH constants for arg0.
AUTH_RSAPUBLICKEY = 3
[docs]class InvalidCommandError(Exception):
"""Got an invalid command over USB.
.. image:: _static/adb.adb_protocol.InvalidCommandError.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol.InvalidCommandError.__init__.CALLER_GRAPH.svg
"""
def __init__(self, message, response_header, response_data):
if response_header == b'FAIL':
message = 'Command failed, device said so. (%s)' % message
super(InvalidCommandError, self).__init__(message, response_header, response_data)
[docs]class InvalidResponseError(Exception):
"""Got an invalid response to our command.
.. image:: _static/adb.adb_protocol.InvalidResponseError.CALL_GRAPH.svg
"""
[docs]class InvalidChecksumError(Exception):
"""Checksum of data didn't match expected checksum.
.. image:: _static/adb.adb_protocol.InvalidChecksumError.CALL_GRAPH.svg
"""
[docs]class InterleavedDataError(Exception):
"""We only support command sent serially.
.. image:: _static/adb.adb_protocol.InterleavedDataError.CALL_GRAPH.svg
"""
[docs]def find_backspace_runs(stdout_bytes, start_pos):
"""TODO
.. image:: _static/adb.adb_protocol.find_backspace_runs.CALLER_GRAPH.svg
Parameters
----------
stdout_bytes : TODO
TODO
start_pos : TODO
TODO
Returns
-------
int
The index/position of the first backspace.
num_backspaces : int
TODO
"""
first_backspace_pos = stdout_bytes[start_pos:].find(b'\x08')
if first_backspace_pos == -1:
return -1, 0
end_backspace_pos = (start_pos + first_backspace_pos) + 1
while True:
if chr(stdout_bytes[end_backspace_pos]) == '\b':
end_backspace_pos += 1
else:
break
num_backspaces = end_backspace_pos - (start_pos + first_backspace_pos)
return (start_pos + first_backspace_pos), num_backspaces
[docs]def MakeWireIDs(ids):
"""TODO
Parameters
----------
ids : list[bytes]
TODO
Returns
-------
id_to_wire : dict
TODO
wire_to_id : dict
TODO
"""
id_to_wire = {cmd_id: sum(c << (i * 8) for i, c in enumerate(bytearray(cmd_id))) for cmd_id in ids}
wire_to_id = {wire: cmd_id for cmd_id, wire in id_to_wire.items()}
return id_to_wire, wire_to_id
[docs]class AuthSigner(object):
"""Signer for use with authenticated ADB, introduced in 4.4.x/KitKat."""
[docs] def Sign(self, data):
"""Signs given data using a private key.
Parameters
----------
data : bytes
The data to be signed
Raises
------
NotImplementedError
This method is implemented in subclasses.
"""
raise NotImplementedError()
[docs] def GetPublicKey(self):
"""Returns the public key in PEM format without headers or newlines.
Raises
------
NotImplementedError
This method is implemented in subclasses.
"""
raise NotImplementedError()
[docs]class _AdbConnection(object):
"""ADB Connection.
.. image:: _static/adb.adb_protocol._AdbConnection.__init__.CALLER_GRAPH.svg
Parameters
----------
usb : adb.common.UsbHandle
TODO
local_id : TODO
TODO
remote_id : TODO
TODO
timeout_ms : int
Timeout in milliseconds for USB packets.
Attributes
----------
local_id : TODO
The ID for the sender
remote_id : TODO
The ID for the recipient
timeout_ms : int
Timeout in milliseconds for USB packets.
usb : adb.common.UsbHandle
TODO
"""
def __init__(self, usb, local_id, remote_id, timeout_ms):
self.usb = usb
self.local_id = local_id
self.remote_id = remote_id
self.timeout_ms = timeout_ms
[docs] def _Send(self, command, arg0, arg1, data=b''):
"""TODO
.. image:: _static/adb.adb_protocol._AdbConnection._Send.CALLER_GRAPH.svg
Parameters
----------
command : TODO
TODO
arg0 : TODO
TODO
arg1 : TODO
TODO
data : bytes
TODO
"""
message = AdbMessage(command, arg0, arg1, data)
message.Send(self.usb, self.timeout_ms)
[docs] def Write(self, data):
"""Write a packet and expect an Ack.
.. image:: _static/adb.adb_protocol._AdbConnection.Write.CALL_GRAPH.svg
Parameters
----------
data : TODO
TODO
Returns
-------
int
``len(data)``
Raises
------
usb_exceptions.AdbCommandFailureException
The command failed.
adb.adb_protocol.InvalidCommandError
Expected an OKAY in response to a WRITE, got something else.
"""
self._Send(b'WRTE', arg0=self.local_id, arg1=self.remote_id, data=data)
# Expect an ack in response.
cmd, okay_data = self.ReadUntil(b'OKAY')
if cmd != b'OKAY':
if cmd == b'FAIL':
raise usb_exceptions.AdbCommandFailureException('Command failed.', okay_data)
raise InvalidCommandError('Expected an OKAY in response to a WRITE, got {0} ({1})'.format(cmd, okay_data), cmd, okay_data)
return len(data)
[docs] def Okay(self):
"""TODO
.. image:: _static/adb.adb_protocol._AdbConnection.Okay.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol._AdbConnection.Okay.CALLER_GRAPH.svg
"""
self._Send(b'OKAY', arg0=self.local_id, arg1=self.remote_id)
[docs] def ReadUntil(self, *expected_cmds):
"""Read a packet, Ack any write packets.
.. image:: _static/adb.adb_protocol._AdbConnection.ReadUntil.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol._AdbConnection.ReadUntil.CALLER_GRAPH.svg
Parameters
----------
*expected_cmds : TODO
TODO
Returns
-------
cmd : TODO
TODO
data : TODO
TODO
Raises
------
adb.adb_protocol.InterleavedDataError
We don't support multiple streams...
adb.adb_protocol.InvalidResponseError
Incorrect remote id.
"""
cmd, remote_id, local_id, data = AdbMessage.Read(self.usb, expected_cmds, self.timeout_ms)
if local_id not in (0, self.local_id):
raise InterleavedDataError("We don't support multiple streams...")
if remote_id not in (0, self.remote_id):
raise InvalidResponseError('Incorrect remote id, expected {0} got {1}'.format(self.remote_id, remote_id))
# Ack write packets.
if cmd == b'WRTE':
self.Okay()
return cmd, data
[docs] def ReadUntilClose(self):
"""Yield packets until a ``b'CLSE'`` packet is received.
.. image:: _static/adb.adb_protocol._AdbConnection.ReadUntilClose.CALL_GRAPH.svg
Yields
------
data : TODO
TODO
"""
while True:
cmd, data = self.ReadUntil(b'CLSE', b'WRTE')
if cmd == b'CLSE':
self._Send(b'CLSE', arg0=self.local_id, arg1=self.remote_id)
break
if cmd != b'WRTE':
if cmd == b'FAIL':
raise usb_exceptions.AdbCommandFailureException('Command failed.', data)
raise InvalidCommandError('Expected a WRITE or a CLOSE, got {0} ({1})'.format(cmd, data), cmd, data)
yield data
[docs] def Close(self):
"""TODO
.. image:: _static/adb.adb_protocol._AdbConnection.Close.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol._AdbConnection.Close.CALLER_GRAPH.svg
Raises
------
usb_exceptions.AdbCommandFailureException
Command failed.
adb.adb_protocol.InvalidCommandError
Expected a ``CLSE`` response but received something else.
"""
self._Send(b'CLSE', arg0=self.local_id, arg1=self.remote_id)
cmd, data = self.ReadUntil(b'CLSE')
if cmd != b'CLSE':
if cmd == b'FAIL':
raise usb_exceptions.AdbCommandFailureException('Command failed.', data)
raise InvalidCommandError('Expected a CLSE response, got {0} ({1})'.format(cmd, data), cmd, data)
[docs]class AdbMessage(object):
"""ADB Protocol and message class.
.. rubric:: local_id/remote_id
Turns out the documentation is host/device ambidextrous, so ``local_id`` is the id for 'the sender' and
``remote_id`` is for 'the recipient'. So since we're only on the host, we'll re-document with host_id and device_id:
::
OPEN(host_id, 0, 'shell:XXX')
READY/OKAY(device_id, host_id, '')
WRITE(0, host_id, 'data')
CLOSE(device_id, host_id, '')
.. image:: _static/adb.adb_protocol.AdbMessage.__init__.CALLER_GRAPH.svg
Parameters
----------
command : bytes, None
One of: ``[b'SYNC', b'CNXN', b'AUTH', b'OPEN', b'OKAY', b'CLSE', b'WRTE']``
arg0 : TODO, None
TODO
arg1 : TODO, None
TODO
data : bytes
TODO
Attributes
----------
command : int
The value in :const:`AdbMessage.commands` that corresponds to the ``command`` parameter
commands : dict
A dictionary with keys ``[b'SYNC', b'CNXN', b'AUTH', b'OPEN', b'OKAY', b'CLSE', b'WRTE']``.
connections : int
TODO
constants : dict
A dictionary with values ``[b'SYNC', b'CNXN', b'AUTH', b'OPEN', b'OKAY', b'CLSE', b'WRTE']``.
format : bytes
The format for unpacking the ADB message.
ids : list[bytes]
``[b'SYNC', b'CNXN', b'AUTH', b'OPEN', b'OKAY', b'CLSE', b'WRTE']``
"""
ids = [b'SYNC', b'CNXN', b'AUTH', b'OPEN', b'OKAY', b'CLSE', b'WRTE']
commands, constants = MakeWireIDs(ids)
# An ADB message is 6 words in little-endian.
format = b'<6I'
connections = 0
def __init__(self, command=None, arg0=None, arg1=None, data=b''):
self.command = self.commands[command]
self.magic = self.command ^ 0xFFFFFFFF
self.arg0 = arg0
self.arg1 = arg1
self.data = data
@property
def checksum(self):
"""TODO
.. image:: _static/adb.adb_protocol.AdbMessage.checksum.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol.AdbMessage.checksum.CALLER_GRAPH.svg
Returns
-------
TODO
TODO
"""
return self.CalculateChecksum(self.data)
[docs] @staticmethod
def CalculateChecksum(data):
"""TODO
.. image:: _static/adb.adb_protocol.AdbMessage.CalculateChecksum.CALLER_GRAPH.svg
Returns
-------
TODO
TODO
"""
# The checksum is just a sum of all the bytes. I swear.
if isinstance(data, bytearray):
total = sum(data)
elif isinstance(data, bytes):
if data and isinstance(data[0], bytes):
# Python 2 bytes (str) index as single-character strings.
total = sum(map(ord, data))
else:
# Python 3 bytes index as numbers (and PY2 empty strings sum() to 0)
total = sum(data)
else:
# Unicode strings (should never see?)
total = sum(map(ord, data))
return total & 0xFFFFFFFF
[docs] def Pack(self):
"""Returns this message in an over-the-wire format.
.. image:: _static/adb.adb_protocol.AdbMessage.Pack.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol.AdbMessage.Pack.CALLER_GRAPH.svg
Returns
-------
bytes
TODO
"""
return struct.pack(self.format, self.command, self.arg0, self.arg1, len(self.data), self.checksum, self.magic)
[docs] @classmethod
def Unpack(cls, message):
"""TODO
.. image:: _static/adb.adb_protocol.AdbMessage.Unpack.CALLER_GRAPH.svg
Parameters
----------
message : TODO
TODO
Returns
-------
cmd : TODO
TODO
arg0 : TODO
TODO
arg1 : TODO
TODO
data_length : TODO
TODO
data_checksum : TODO
TODO
unused_magic : TODO
TODO
Raises
------
ValueError
Unable to unpack the ADB command.
"""
try:
cmd, arg0, arg1, data_length, data_checksum, unused_magic = struct.unpack(cls.format, message)
except struct.error as e:
raise ValueError('Unable to unpack ADB command.', cls.format, message, e)
return cmd, arg0, arg1, data_length, data_checksum
[docs] def Send(self, usb, timeout_ms=None):
"""Send this message over USB.
.. image:: _static/adb.adb_protocol.AdbMessage.Send.CALL_GRAPH.svg
Parameters
----------
usb : adb.common.TcpHandle, adb.common.UsbHandle
TODO
timeout_ms : int, None
Timeout in milliseconds for USB packets.
"""
usb.BulkWrite(self.Pack(), timeout_ms)
usb.BulkWrite(self.data, timeout_ms)
[docs] @classmethod
def Read(cls, usb, expected_cmds, timeout_ms=None, total_timeout_ms=None):
"""Receive a response from the device.
.. image:: _static/adb.adb_protocol.AdbMessage.Read.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol.AdbMessage.Read.CALLER_GRAPH.svg
Parameters
----------
usb : adb.common.TcpHandle, adb.common.UsbHandle
TODO
expected_cmds : TODO
Read until we receive a header ID that is in ``expected_cmds``
timeout_ms : int, None
Timeout in milliseconds for USB packets.
total_timeout_ms : int, None
The total time to wait for a command in ``expected_cmds``
Returns
-------
command : TODO
TODO
arg0 : TODO
TODO
arg1 : TODO
TODO
bytes
TODO
Raises
------
adb.adb_protocol.InvalidCommandError
Unknown command *or* never got one of the expected responses.
adb.adb_protocol.InvalidChecksumError
Received checksum does not match the expected checksum.
"""
total_timeout_ms = usb.Timeout(total_timeout_ms)
start = time.time()
while True:
msg = usb.BulkRead(24, timeout_ms)
cmd, arg0, arg1, data_length, data_checksum = cls.Unpack(msg)
command = cls.constants.get(cmd)
if not command:
raise InvalidCommandError('Unknown command: %x' % cmd, cmd, (arg0, arg1))
if command in expected_cmds:
break
if time.time() - start > total_timeout_ms:
raise InvalidCommandError('Never got one of the expected responses (%s)' % expected_cmds, cmd, (timeout_ms, total_timeout_ms))
if data_length > 0:
data = bytearray()
while data_length > 0:
temp = usb.BulkRead(data_length, timeout_ms)
if len(temp) != data_length:
print("Data_length {} does not match actual number of bytes read: {}".format(data_length, len(temp)))
data += temp
data_length -= len(temp)
actual_checksum = cls.CalculateChecksum(data)
if actual_checksum != data_checksum:
raise InvalidChecksumError('Received checksum {0} != {1}'.format(actual_checksum, data_checksum))
else:
data = b''
return command, arg0, arg1, bytes(data)
[docs] @classmethod
def Connect(cls, usb, banner=b'notadb', rsa_keys=None, auth_timeout_ms=100):
"""Establish a new connection to the device.
.. image:: _static/adb.adb_protocol.AdbMessage.Connect.CALL_GRAPH.svg
Parameters
----------
usb : adb.common.TcpHandle, adb.common.UsbHandle
A :class:`adb.common.TcpHandle` or :class:`adb.common.UsbHandle` instance with ``BulkRead`` and ``BulkWrite`` methods.
banner : str
A string to send as a host identifier.
rsa_keys : list[adb_protocol.AuthSigner]
List of :class:`AuthSigner` subclass instances to be used for authentication. The device can either accept one of
these via the ``Sign`` method, or we will send the result of ``GetPublicKey`` from the first one if the
device doesn't accept any of them.
auth_timeout_ms : int
Timeout to wait for when sending a new public key. This
is only relevant when we send a new public key. The device shows a
dialog and this timeout is how long to wait for that dialog. If used
in automation, this should be low to catch such a case as a failure
quickly; while in interactive settings it should be high to allow
users to accept the dialog. We default to automation here, so it's low
by default.
Returns
-------
banner : TODO
The device's reported banner. Always starts with the state (device, recovery, or sideload), sometimes
includes information after a : with various product information.
Raises
------
adb.usb_exceptions.DeviceAuthError
When the device expects authentication, but we weren't given any valid keys.
adb.adb_protocol.InvalidResponseError
When the device does authentication in an unexpected way.
usb_exceptions.ReadFailedError
TODO
"""
# In py3, convert unicode to bytes. In py2, convert str to bytes.
# It's later joined into a byte string, so in py2, this ends up kind of being a no-op.
if isinstance(banner, str):
banner = bytearray(banner, 'utf-8')
msg = cls(command=b'CNXN', arg0=VERSION, arg1=MAX_ADB_DATA, data=b'host::%s\0' % banner)
msg.Send(usb)
cmd, arg0, arg1, banner = cls.Read(usb, [b'CNXN', b'AUTH'])
if cmd == b'AUTH':
if not rsa_keys:
raise usb_exceptions.DeviceAuthError('Device authentication required, no keys available.')
# Loop through our keys, signing the last 'banner' or token.
for rsa_key in rsa_keys:
if arg0 != AUTH_TOKEN:
raise InvalidResponseError('Unknown AUTH response: %s %s %s' % (arg0, arg1, banner))
# Do not mangle the banner property here by converting it to a string
signed_token = rsa_key.Sign(banner)
msg = cls(command=b'AUTH', arg0=AUTH_SIGNATURE, arg1=0, data=signed_token)
msg.Send(usb)
cmd, arg0, unused_arg1, banner = cls.Read(usb, [b'CNXN', b'AUTH'])
if cmd == b'CNXN':
return banner
# None of the keys worked, so send a public key.
msg = cls(command=b'AUTH', arg0=AUTH_RSAPUBLICKEY, arg1=0, data=rsa_keys[0].GetPublicKey() + b'\0')
msg.Send(usb)
try:
cmd, arg0, unused_arg1, banner = cls.Read(usb, [b'CNXN'], timeout_ms=auth_timeout_ms)
except usb_exceptions.ReadFailedError as e:
if e.usb_error.value == -7: # Timeout.
raise usb_exceptions.DeviceAuthError('Accept auth key on device, then retry.')
raise
# This didn't time-out, so we got a CNXN response.
return banner
return banner
[docs] @classmethod
def Open(cls, usb, destination, timeout_ms=None):
"""Opens a new connection to the device via an ``OPEN`` message.
Not the same as the posix ``open`` or any other google3 Open methods.
.. image:: _static/adb.adb_protocol.AdbMessage.Open.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol.AdbMessage.Open.CALLER_GRAPH.svg
Parameters
----------
usb : adb.common.TcpHandle, adb.common.UsbHandle
A :class:`adb.common.TcpHandle` or :class:`adb.common.UsbHandle` instance with ``BulkRead`` and ``BulkWrite`` methods.
destination : TODO
The service:command string.
timeout_ms : int, None
Timeout in milliseconds for USB packets.
Returns
-------
_AdbConnection, None
The local connection id.
Raises
------
adb.adb_protocol.InvalidResponseError
Wrong local_id sent to us.
adb.adb_protocol.InvalidCommandError
Didn't get a ready response.
"""
local_id = 1
msg = cls(command=b'OPEN', arg0=local_id, arg1=0, data=destination + b'\0')
msg.Send(usb, timeout_ms)
cmd, remote_id, their_local_id, _ = cls.Read(usb, [b'CLSE', b'OKAY'], timeout_ms=timeout_ms)
if local_id != their_local_id:
raise InvalidResponseError('Expected the local_id to be {}, got {}'.format(local_id, their_local_id))
if cmd == b'CLSE':
# Some devices seem to be sending CLSE once more after a request, this *should* handle it
cmd, remote_id, their_local_id, _ = cls.Read(usb, [b'CLSE', b'OKAY'], timeout_ms=timeout_ms)
# Device doesn't support this service.
if cmd == b'CLSE':
return None
if cmd != b'OKAY':
raise InvalidCommandError('Expected a ready response, got {}'.format(cmd), cmd, (remote_id, their_local_id))
return _AdbConnection(usb, local_id, remote_id, timeout_ms)
[docs] @classmethod
def Command(cls, usb, service, command='', timeout_ms=None):
"""One complete set of USB packets for a single command.
Sends ``service:command`` in a new connection, reading the data for the
response. All the data is held in memory, large responses will be slow and
can fill up memory.
.. image:: _static/adb.adb_protocol.AdbMessage.Command.CALL_GRAPH.svg
Parameters
----------
usb : adb.common.TcpHandle, adb.common.UsbHandle
A :class:`adb.common.TcpHandle` or :class:`adb.common.UsbHandle` instance with ``BulkRead`` and ``BulkWrite`` methods.
service : TODO
The service on the device to talk to.
command : str
The command to send to the service.
timeout_ms : int, None
Timeout in milliseconds for USB packets.
Returns
-------
str
The response from the service.
Raises
------
adb.adb_protocol.InterleavedDataError
Multiple streams running over usb.
adb.adb_protocol.InvalidCommandError
Got an unexpected response command.
"""
return ''.join(cls.StreamingCommand(usb, service, command, timeout_ms))
[docs] @classmethod
def StreamingCommand(cls, usb, service, command='', timeout_ms=None):
"""One complete set of USB packets for a single command.
Sends ``service:command`` in a new connection, reading the data for the
response. All the data is held in memory, large responses will be slow and
can fill up memory.
.. image:: _static/adb.adb_protocol.AdbMessage.StreamingCommand.CALL_GRAPH.svg
.. image:: _static/adb.adb_protocol.AdbMessage.StreamingCommand.CALLER_GRAPH.svg
Parameters
----------
usb : adb.common.TcpHandle, adb.common.UsbHandle
A :class:`adb.common.TcpHandle` or :class:`adb.common.UsbHandle` instance with ``BulkRead`` and ``BulkWrite`` methods.
service : TODO
The service on the device to talk to.
command : str
The command to send to the service.
timeout_ms : int, None
Timeout in milliseconds for USB packets.
Yields
------
str
The responses from the service.
Raises
------
adb.adb_protocol.InterleavedDataError
Multiple streams running over usb.
adb.adb_protocol.InvalidCommandError
Got an unexpected response command.
"""
if not isinstance(command, bytes):
command = command.encode('utf8')
connection = cls.Open(usb, destination=b'%s:%s' % (service, command), timeout_ms=timeout_ms)
for data in connection.ReadUntilClose():
yield data.decode('utf8')
[docs] @classmethod
def InteractiveShellCommand(cls, conn, cmd=None, strip_cmd=True, delim=None, strip_delim=True, clean_stdout=True):
"""Retrieves stdout of the current InteractiveShell and sends a shell command if provided
TODO: Should we turn this into a yield based function so we can stream all output?
.. image:: _static/adb.adb_protocol.AdbMessage.InteractiveShellCommand.CALL_GRAPH.svg
Parameters
----------
conn : AdbConnection
Instance of AdbConnection
cmd : str, None
Command to run on the target.
strip_cmd : bool
Strip command name from stdout.
delim : TODO
Delimiter to look for in the output to know when to stop expecting more output (usually the shell prompt)
strip_delim : bool
Strip the provided delimiter from the output
clean_stdout : bool
Cleanup the stdout stream of any backspaces and the characters that were deleted by the backspace
Returns
-------
stdout : TODO
The stdout from the shell command.
"""
if delim is not None and not isinstance(delim, bytes):
delim = delim.encode('utf-8')
# Delimiter may be shell@hammerhead:/ $
# The user or directory could change, making the delimiter somthing like root@hammerhead:/data/local/tmp $
# Handle a partial delimiter to search on and clean up
if delim:
user_pos = delim.find(b'@')
dir_pos = delim.rfind(b':/')
if user_pos != -1 and dir_pos != -1:
partial_delim = delim[user_pos:dir_pos + 1] # e.g. @hammerhead:
else:
partial_delim = delim
else:
partial_delim = None
stdout = ''
stdout_stream = BytesIO()
original_cmd = ''
try:
if cmd:
original_cmd = str(cmd)
cmd += '\r' # Required. Send a carriage return right after the cmd
cmd = cmd.encode('utf8')
# Send the cmd raw
conn.Write(cmd)
if delim:
# Expect multiple WRTE cmds until the delim (usually terminal prompt) is detected
data = b''
while partial_delim not in data:
cmd, data = conn.ReadUntil(b'WRTE')
stdout_stream.write(data)
else:
# Otherwise, expect only a single WRTE
cmd, data = conn.ReadUntil(b'WRTE')
# WRTE cmd from device will follow with stdout data
stdout_stream.write(data)
else:
# No cmd provided means we should just expect a single line from the terminal. Use this sparingly
cmd, data = conn.ReadUntil(b'WRTE')
if cmd == b'WRTE':
# WRTE cmd from device will follow with stdout data
stdout_stream.write(data)
else:
print("Unhandled cmd: {}".format(cmd))
cleaned_stdout_stream = BytesIO()
if clean_stdout:
stdout_bytes = stdout_stream.getvalue()
bsruns = {} # Backspace runs tracking
next_start_pos = 0
last_run_pos, last_run_len = find_backspace_runs(stdout_bytes, next_start_pos)
if last_run_pos != -1 and last_run_len != 0:
bsruns.update({last_run_pos: last_run_len})
cleaned_stdout_stream.write(stdout_bytes[next_start_pos:(last_run_pos - last_run_len)])
next_start_pos += last_run_pos + last_run_len
while last_run_pos != -1:
last_run_pos, last_run_len = find_backspace_runs(stdout_bytes[next_start_pos:], next_start_pos)
if last_run_pos != -1:
bsruns.update({last_run_pos: last_run_len})
cleaned_stdout_stream.write(stdout_bytes[next_start_pos:(last_run_pos - last_run_len)])
next_start_pos += last_run_pos + last_run_len
cleaned_stdout_stream.write(stdout_bytes[next_start_pos:])
else:
cleaned_stdout_stream.write(stdout_stream.getvalue())
stdout = cleaned_stdout_stream.getvalue()
# Strip original cmd that will come back in stdout
if original_cmd and strip_cmd:
findstr = original_cmd.encode('utf-8') + b'\r\r\n'
pos = stdout.find(findstr)
while pos >= 0:
stdout = stdout.replace(findstr, b'')
pos = stdout.find(findstr)
if b'\r\r\n' in stdout:
stdout = stdout.split(b'\r\r\n')[1]
# Strip delim if requested
# TODO: Handling stripping partial delims here - not a deal breaker the way we're handling it now
if delim and strip_delim:
stdout = stdout.replace(delim, b'')
stdout = stdout.rstrip()
except Exception as e: # pylint: disable=broad-except
print("InteractiveShell exception (most likely timeout): {}".format(e))
return stdout