ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

UI自动化偷懒必备:AirTest封装好ADB命令

2021-12-12 21:35:12  阅读:223  来源: 互联网

标签:shell return package self ADB UI AirTest device adb


文章目录

前言

在UI自动化测试的过程中免不了要跟ADB打交道,原生的ADB命令用是没问题,问题是结合实际业务和效率来说,基本都要封装一下,如果用Python语言的同学就可以偷懒一下,偷懒是为了避免重复造轮子,网易开源的AirTest里有ADB的封装源码,我们拿出来,结合自己需求灵活使用,这里要感谢网易的测开团队。

安装依赖

上面有提到ADB是基于AirTest的,所以要安装AirTest库

pip -U install airtest

源码

注意:如果有安装有多个adb版本,出现路径冲突的话,需指定对应的adb路径
使用也比较简单,初始化时传入设备号:serialno就可以,其他参数按需传入
大部分的ADB命令已封装,喜欢的朋友复制拿走

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

import os
import re
import sys
import time
import random
import platform
import warnings
import subprocess
import threading

from six import PY3, text_type, binary_type, raise_from
from six.moves import reduce

from airtest.core.android.constant import (DEFAULT_ADB_PATH, IP_PATTERN,
                                           SDK_VERISON_ANDROID7)
from airtest.core.error import (AdbError, AdbShellError, AirtestError,
                                DeviceConnectionError)
from airtest.utils.compat import decode_path, raisefrom, proc_communicate_timeout, SUBPROCESS_FLAG
from airtest.utils.logger import get_logger
from airtest.utils.nbsp import NonBlockingStreamReader
from airtest.utils.retry import retries
from airtest.utils.snippet import get_std_encoding, reg_cleanup, split_cmd

LOGGING = get_logger(__name__)


class ADB(object):
    """adb client object class"""

    _instances = []
    status_device = "device"
    status_offline = "offline"
    SHELL_ENCODING = "utf-8"

    def __init__(self, serialno=None, adb_path=None, server_addr=None, display_id=None, input_event=None):
        self.serialno = serialno
        self.adb_path = adb_path or self.builtin_adb_path()
        self.display_id = display_id
        self.input_event = input_event
        self._set_cmd_options(server_addr)
        self.connect()
        self._sdk_version = None
        self._line_breaker = None
        self._display_info = {}
        self._display_info_lock = threading.Lock()
        self._forward_local_using = []
        self.__class__._instances.append(self)

    @staticmethod
    def builtin_adb_path():
        """
        Return built-in adb executable path

        Returns:
            adb executable path

        """
        system = platform.system()
        machine = platform.machine()
        adb_path = DEFAULT_ADB_PATH.get('{}-{}'.format(system, machine))
        if not adb_path:
            adb_path = DEFAULT_ADB_PATH.get(system)
        if not adb_path:
            raise RuntimeError("No adb executable supports this platform({}-{}).".format(system, machine))

        # overwrite uiautomator adb
        if "ANDROID_HOME" in os.environ:
            del os.environ["ANDROID_HOME"]
        return adb_path

    def _set_cmd_options(self, server_addr=None):
        """
        Set communication parameters (host and port) between adb server and adb client

        Args:
            server_addr: adb server address, default is 127.0.0.1:5037

        Returns:
            None

        """
        self.host = server_addr[0] if server_addr else "127.0.0.1"
        self.port = server_addr[1] if server_addr else 5037
        self.cmd_options = [self.adb_path]
        if self.host not in ("localhost", "127.0.0.1"):
            self.cmd_options += ['-H', self.host]
        if self.port != 5037:
            self.cmd_options += ['-P', str(self.port)]

    def start_server(self):
        """
        Perform `adb start-server` command to start the adb server

        Returns:
            None

        """
        return self.cmd("start-server", device=False)

    def kill_server(self):
        """
        Perform `adb kill-server` command to kill the adb server

        Returns:
            None

        """
        return self.cmd("kill-server", device=False)

    def version(self):
        """
        Perform `adb version` command and return the command output

        Returns:
            command output

        """
        return self.cmd("version", device=False).strip()

    def start_cmd(self, cmds, device=True):
        """
        Start a subprocess with adb command(s)

        Args:
            cmds: command(s) to be run
            device: if True, the device serial number must be specified by `-s serialno` argument

        Raises:
            RuntimeError: if `device` is True and serialno is not specified

        Returns:
            a subprocess

        """
        if device:
            if not self.serialno:
                raise RuntimeError("please set serialno first")
            cmd_options = self.cmd_options + ['-s', self.serialno]
        else:
            cmd_options = self.cmd_options

        cmds = cmd_options + split_cmd(cmds)
        LOGGING.debug(" ".join(cmds))

        if not PY3:
            cmds = [c.encode(get_std_encoding(sys.stdin)) for c in cmds]

        proc = subprocess.Popen(
            cmds,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            creationflags=SUBPROCESS_FLAG
        )
        return proc

    def cmd(self, cmds, device=True, ensure_unicode=True, timeout=None):
        """
        Run the adb command(s) in subprocess and return the standard output

        Args:
            cmds: command(s) to be run
            device: if True, the device serial number must be specified by -s serialno argument
            ensure_unicode: encode/decode unicode of standard outputs (stdout, stderr)
            timeout: timeout in seconds

        Raises:
            DeviceConnectionError: if any error occurs when connecting the device
            AdbError: if any other adb error occurs

        Returns:
            command(s) standard output (stdout)

        """
        proc = self.start_cmd(cmds, device)
        if timeout:
            stdout, stderr = proc_communicate_timeout(proc, timeout)
        else:
            stdout, stderr = proc.communicate()

        if ensure_unicode:
            stdout = stdout.decode(get_std_encoding(sys.stdout))
            stderr = stderr.decode(get_std_encoding(sys.stderr))

        if proc.returncode > 0:
            # adb connection error
            pattern = DeviceConnectionError.DEVICE_CONNECTION_ERROR
            if isinstance(stderr, binary_type):
                pattern = pattern.encode("utf-8")
            if re.search(pattern, stderr):
                raise DeviceConnectionError(stderr)
            else:
                raise AdbError(stdout, stderr)
        return stdout

    def close_proc_pipe(self, proc):
        """close stdin/stdout/stderr of subprocess.Popen."""

        def close_pipe(pipe):
            if pipe:
                pipe.close()

        close_pipe(proc.stdin)
        close_pipe(proc.stdout)
        close_pipe(proc.stderr)

    def devices(self, state=None):
        """
        Perform `adb devices` command and return the list of adb devices

        Args:
            state: optional parameter to filter devices in specific state

        Returns:
            list od adb devices

        """
        patten = re.compile(r'^[\w\d.:-]+\t[\w]+$')
        device_list = []
        # self.start_server()
        output = self.cmd("devices", device=False)
        for line in output.splitlines():
            line = line.strip()
            if not line or not patten.match(line):
                continue
            serialno, cstate = line.split('\t')
            if state and cstate != state:
                continue
            device_list.append((serialno, cstate))
        return device_list

    def connect(self, force=False):
        """
        Perform `adb connect` command, remote devices are preferred to connect first

        Args:
            force: force connection, default is False

        Returns:
            None

        """
        if self.serialno and ":" in self.serialno and (force or self.get_status() != "device"):
            connect_result = self.cmd("connect %s" % self.serialno)
            LOGGING.info(connect_result)

    def disconnect(self):
        """
        Perform `adb disconnect` command

        Returns:
            None

        """
        if ":" in self.serialno:
            self.cmd("disconnect %s" % self.serialno)

    def get_status(self):
        """
        Perform `adb get-state` and return the device status

        Raises:
            AdbError: if status cannot be obtained from the device

        Returns:
            None if status is `not found`, otherwise return the standard output from `adb get-state` command

        """
        proc = self.start_cmd("get-state")
        stdout, stderr = proc.communicate()

        stdout = stdout.decode(get_std_encoding(sys.stdout))
        stderr = stderr.decode(get_std_encoding(sys.stdout))

        if proc.returncode == 0:
            return stdout.strip()
        elif "not found" in stderr:
            return None
        else:
            raise AdbError(stdout, stderr)

    def wait_for_device(self, timeout=5):
        """
        Perform `adb wait-for-device` command

        Args:
            timeout: time interval in seconds to wait for device

        Raises:
            DeviceConnectionError: if device is not available after timeout

        Returns:
            None

        """
        try:
            self.cmd("wait-for-device", timeout=timeout)
        except RuntimeError as e:
            raisefrom(DeviceConnectionError, "device not ready", e)

    def start_shell(self, cmds):
        """
        Handle `adb shell` command(s)

        Args:
            cmds: adb shell command(s)

        Returns:
            None

        """
        cmds = ['shell'] + split_cmd(cmds)
        return self.start_cmd(cmds)

    def raw_shell(self, cmds, ensure_unicode=True):
        """
        Handle `adb shell` command(s) with unicode support

        Args:
            cmds: adb shell command(s)
            ensure_unicode: decode/encode unicode True or False, default is True

        Returns:
            command(s) output

        """
        cmds = ['shell'] + split_cmd(cmds)
        out = self.cmd(cmds, ensure_unicode=False)
        if not ensure_unicode:
            return out
        # use shell encoding to decode output
        try:
            return out.decode(self.SHELL_ENCODING)
        except UnicodeDecodeError:
            warnings.warn("shell output decode {} fail. repr={}".format(self.SHELL_ENCODING, repr(out)))
            return text_type(repr(out))

    def shell(self, cmd):
        """
        Run the `adb shell` command on the device

        Args:
            cmd: a command to be run

        Raises:
            AdbShellError: if command return value is non-zero or if any other `AdbError` occurred

        Returns:
            command output

        """
        if self.sdk_version < SDK_VERISON_ANDROID7:
            # for sdk_version < 25, adb shell do not raise error
            # https://stackoverflow.com/questions/9379400/adb-error-codes
            cmd = split_cmd(cmd) + [";", "echo", "---$?---"]
            out = self.raw_shell(cmd).rstrip()
            m = re.match("(.*)---(\d+)---$", out, re.DOTALL)
            if not m:
                warnings.warn("return code not matched")
                stdout = out
                returncode = 0
            else:
                stdout = m.group(1)
                returncode = int(m.group(2))
            if returncode > 0:
                raise AdbShellError("", stdout)
            return stdout
        else:
            try:
                out = self.raw_shell(cmd)
            except AdbError as err:
                raise AdbShellError(err.stdout, err.stderr)
            else:
                return out

    def keyevent(self, keyname):
        """
        Perform `adb shell input keyevent` command on the device

        Args:
            keyname: key event name

        Returns:
            None

        """
        self.shell(["input", "keyevent", keyname.upper()])

    def getprop(self, key, strip=True):
        """
        Perform `adb shell getprop` on the device

        Args:
            key: key value for property
            strip: True or False to strip the return carriage and line break from returned string

        Returns:
            propery value

        """
        prop = self.raw_shell(['getprop', key])
        if strip:
            if "\r\r\n" in prop:
                # Some mobile phones will output multiple lines of extra log
                prop = prop.split("\r\r\n")
                if len(prop) > 1:
                    prop = prop[-2]
                else:
                    prop = prop[-1]
            else:
                prop = prop.strip("\r\n")
        return prop

    @property
    @retries(max_tries=3)
    def sdk_version(self):
        """
        Get the SDK version from the device

        Returns:
            SDK version
        """
        if self._sdk_version is None:
            keyname = 'ro.build.version.sdk'
            self._sdk_version = int(self.getprop(keyname))
        return self._sdk_version

    def push(self, local, remote):
        """
        Perform `adb push` command

        Args:
            local: local file to be copied to the device
            remote: destination on the device where the file will be copied

        Returns:
            None

        """
        self.cmd(["push", local, remote], ensure_unicode=False)

    def pull(self, remote, local):
        """
        Perform `adb pull` command
        Args:
            remote: remote file to be downloaded from the device
            local: local destination where the file will be downloaded from the device

        Returns:
            None
        """
        self.cmd(["pull", remote, local], ensure_unicode=False)

    def forward(self, local, remote, no_rebind=True):
        """
        Perform `adb forward` command

        Args:
            local: local tcp port to be forwarded
            remote: tcp port of the device where the local tcp port will be forwarded
            no_rebind: True or False

        Returns:
            None

        """
        cmds = ['forward']
        if no_rebind:
            cmds += ['--no-rebind']
        self.cmd(cmds + [local, remote])
        # register for cleanup atexit
        if local not in self._forward_local_using:
            self._forward_local_using.append(local)

    def get_forwards(self):
        """
        Perform `adb forward --list`command

        Yields:
            serial number, local tcp port, remote tcp port

        Returns:
            None

        """
        out = self.cmd(['forward', '--list'])
        for line in out.splitlines():
            line = line.strip()
            if not line:
                continue
            cols = line.split()
            if len(cols) != 3:
                continue
            serialno, local, remote = cols
            yield serialno, local, remote

    @classmethod
    def get_available_forward_local(cls):
        """
        Generate a pseudo random number between 11111 and 20000 that will be used as local forward port

        Returns:
            integer between 11111 and 20000

        Note:
            use `forward --no-rebind` to check if port is available
        """
        return random.randint(11111, 20000)

    @retries(3)
    def setup_forward(self, device_port):
        """
        Generate pseudo random local port and check if the port is available.

        Args:
            device_port: it can be string or the value of the `function(localport)`,
                         e.g. `"tcp:5001"` or `"localabstract:{}".format`

        Returns:
            local port and device port

        """
        localport = self.get_available_forward_local()
        if callable(device_port):
            device_port = device_port(localport)
        self.forward("tcp:%s" % localport, device_port)
        return localport, device_port

    def remove_forward(self, local=None):
        """
        Perform `adb forward --remove` command

        Args:
            local: local tcp port

        Returns:
            None

        """
        if local:
            cmds = ["forward", "--remove", local]
        else:
            cmds = ["forward", "--remove-all"]
        self.cmd(cmds)
        # unregister for cleanup
        if local in self._forward_local_using:
            self._forward_local_using.remove(local)

    def install_app(self, filepath, replace=False, install_options=None):
        """
        Perform `adb install` command

        Args:
            filepath: full path to file to be installed on the device
            replace: force to replace existing application, default is False

                e.g.["-t",  # allow test packages
                    "-l",  # forward lock application,
                    "-s",  # install application on sdcard,
                    "-d",  # allow version code downgrade (debuggable packages only)
                    "-g",  # grant all runtime permissions
                ]

        Returns:
            command output

        """
        if isinstance(filepath, str):
            filepath = decode_path(filepath)

        if not os.path.isfile(filepath):
            raise RuntimeError("file: %s does not exists" % (repr(filepath)))

        if not install_options or type(install_options) != list:
            install_options = []
        if replace:
            install_options.append("-r")
        cmds = ["install", ] + install_options + [filepath, ]
        out = self.cmd(cmds)

        if re.search(r"Failure \[.*?\]", out):
            raise AdbShellError("Installation Failure", repr(out))

        return out

    def install_multiple_app(self, filepath, replace=False, install_options=None):
        """
            Perform `adb install-multiple` command

            Args:
                filepath: full path to file to be installed on the device
                replace: force to replace existing application, default is False
                install_options:  list of options
                    e.g.["-t",  # allow test packages
                        "-l",  # forward lock application,
                        "-s",  # install application on sdcard,
                        "-d",  # allow version code downgrade (debuggable packages only)
                        "-g",  # grant all runtime permissions
                        "-p",  # partial application install (install-multiple only)
                    ]

            Returns:
                command output
        """
        if isinstance(filepath, str):
            filepath = decode_path(filepath)

        if not os.path.isfile(filepath):
            raise RuntimeError("file: %s does not exists" % (repr(filepath)))

        if not install_options or type(install_options) != list:
            install_options = []
        if replace:
            install_options.append("-r")
        cmds = ["install-multiple", ] + install_options + [filepath, ]

        try:
            out = self.cmd(cmds)
        except AdbError as err:
            if "Failed to finalize session".lower() in err.stderr.lower():
                return "Success"
            else:
                return self.install_app(filepath, replace)

        if re.search(r"Failure \[.*?\]", out):
            raise AdbShellError("Installation Failure", repr(out))

        return out

    def pm_install(self, filepath, replace=False):
        """
        Perform `adb push` and `adb install` commands

        Note:
            This is more reliable and recommended way of installing `.apk` files

        Args:
            filepath: full path to file to be installed on the device
            replace: force to replace existing application, default is False

        Returns:
            None

        """
        filename = os.path.basename(filepath)
        device_dir = "/data/local/tmp"
        # if the apk file path contains spaces, the path must be escaped
        device_path = '\"%s/%s\"' % (device_dir, filename)

        out = self.cmd(["push", filepath, device_dir])
        print(out)

        if not replace:
            self.shell(['pm', 'install', device_path])
        else:
            self.shell(['pm', 'install', '-r', device_path])

    def uninstall_app(self, package):
        """
        Perform `adb uninstall` command
        Args:
            package: package name to be uninstalled from the device

        Returns:
            command output

        """
        return self.cmd(['uninstall', package])

    def pm_uninstall(self, package, keepdata=False):
        """
        Perform `adb uninstall` command and delete all related application data

        Args:
            package: package name to be uninstalled from the device
            keepdata: True or False, keep application data after removing the app from the device

        Returns:
            command output

        """
        cmd = ['pm', 'uninstall', package]
        if keepdata:
            cmd.append('-k')
        self.shell(cmd)

    def snapshot(self):
        """
        Take the screenshot of the device display

        Returns:
            command output (stdout)

        """
        if self.display_id:
            raw = self.cmd('shell screencap -d {0} -p'.format(self.display_id), ensure_unicode=False)
        else:
            raw = self.cmd('shell screencap -p', ensure_unicode=False)
        return raw.replace(self.line_breaker, b"\n")

    # PEP 3113 -- Removal of Tuple Parameter Unpacking
    # https://www.python.org/dev/peps/pep-3113/
    def touch(self, tuple_xy):
        """
        Perform user input (touchscreen) on given coordinates

        Args:
            tuple_xy: coordinates (x, y)

        Returns:
            None

        """
        x, y = tuple_xy
        self.shell('input tap %d %d' % (x, y))
        time.sleep(0.1)

    def swipe(self, tuple_x0y0, tuple_x1y1, duration=500):
        """
        Perform user input (swipe screen) from start point (x,y) to end point (x,y)

        Args:
            tuple_x0y0: start point coordinates (x, y)
            tuple_x1y1: end point coordinates (x, y)
            duration: time interval for action, default 500

        Raises:
            AirtestError: if SDK version is not supported

        Returns:
            None

        """
        # prot python 3
        x0, y0 = tuple_x0y0
        x1, y1 = tuple_x1y1

        version = self.sdk_version
        if version <= 15:
            raise AirtestError('swipe: API <= 15 not supported (version=%d)' % version)
        elif version <= 17:
            self.shell('input swipe %d %d %d %d' % (x0, y0, x1, y1))
        else:
            self.shell('input touchscreen swipe %d %d %d %d %d' % (x0, y0, x1, y1, duration))

    def logcat(self, grep_str="", extra_args="", read_timeout=10):
        """
        Perform `adb shell logcat` command and search for given patterns

        Args:
            grep_str: pattern to filter from the logcat output
            extra_args: additional logcat arguments
            read_timeout: time interval to read the logcat, default is 10

        Yields:
            logcat lines containing filtered patterns

        Returns:
            None

        """
        cmds = "shell logcat"
        if extra_args:
            cmds += " " + extra_args
        if grep_str:
            cmds += " | grep " + grep_str
        logcat_proc = self.start_cmd(cmds)
        nbsp = NonBlockingStreamReader(logcat_proc.stdout, print_output=False)
        while True:
            line = nbsp.readline(read_timeout)
            if line is None:
                break
            else:
                yield line
        nbsp.kill()
        logcat_proc.kill()
        return

    def exists_file(self, filepath):
        """
        Check if the file exits on the device

        Args:
            filepath: path to the file

        Returns:
            True or False if file found or not

        """
        try:
            out = self.shell(["ls", filepath])
        except AdbShellError:
            return False
        else:
            return not ("No such file or directory" in out)

    def file_size(self, filepath):
        """
        Get the file size

        Args:
            filepath: path to the file

        Returns:
            The file size

        Raises:
            AdbShellError if no such file
        """
        out = self.shell(["ls", "-l", filepath])
        file_size = int(out.split()[4])
        return file_size

    def _cleanup_forwards(self):
        """
        Remove the local forward ports

        Returns:
            None
        """
        for local in self._forward_local_using:
            self.start_cmd(["forward", "--remove", local])

        self._forward_local_using = []

    @property
    def line_breaker(self):
        """
        Set carriage return and line break property for various platforms and SDK versions

        Returns:
            carriage return and line break string

        """
        if not self._line_breaker:
            if self.sdk_version >= SDK_VERISON_ANDROID7:
                line_breaker = os.linesep
            else:
                line_breaker = '\r' + os.linesep
            self._line_breaker = line_breaker.encode("ascii")
        return self._line_breaker

    @property
    def display_info(self):
        """
        Set device display properties (orientation, rotation and max values for x and y coordinates)

        Notes:
        if there is a lock screen detected, the function tries to unlock the device first

        Returns:
            device screen properties

        """
        self._display_info_lock.acquire()
        if not self._display_info:
            self._display_info = self.get_display_info()
        self._display_info_lock.release()
        return self._display_info

    def get_display_info(self):
        """
        Get information about device physical display (orientation, rotation and max values for x and y coordinates)

        Returns:
            device screen properties

        """
        display_info = self.getPhysicalDisplayInfo()
        orientation = self.getDisplayOrientation()
        max_x, max_y = self.getMaxXY()
        display_info.update({
            "orientation": orientation,
            "rotation": orientation * 90,
            "max_x": max_x,
            "max_y": max_y,
        })
        return display_info

    def getMaxXY(self):
        """
        Get device display maximum values for x and y coordinates

        Returns:
            max x and max y coordinates

        """
        ret = self.shell('getevent -p').split('\n')
        max_x, max_y = None, None
        for i in ret:
            if i.find("0035") != -1:
                patten = re.compile(r'max [0-9]+')
                ret = patten.search(i)
                if ret:
                    max_x = int(ret.group(0).split()[1])

            if i.find("0036") != -1:
                patten = re.compile(r'max [0-9]+')
                ret = patten.search(i)
                if ret:
                    max_y = int(ret.group(0).split()[1])
        return max_x, max_y

    def getRestrictedScreen(self):
        """
        Get value for mRestrictedScreen (without black border / virtual keyboard)`

        Returns:
            screen resolution mRestrictedScreen value as tuple (x, y)

        """
        # get the effective screen resolution of the device
        result = None
        # get the corresponding mRestrictedScreen parameters according to the device serial number
        dumpsys_info = self.shell("dumpsys window")
        match = re.search(r'mRestrictedScreen=.+', dumpsys_info)
        if match:
            infoline = match.group(0).strip()  # like 'mRestrictedScreen=(0,0) 720x1184'
            resolution = infoline.split(" ")[1].split("x")
            if isinstance(resolution, list) and len(resolution) == 2:
                result = int(str(resolution[0])), int(str(resolution[1]))

        return result

    def getPhysicalDisplayInfo(self):
        """
        Get value for display dimension and density from `mPhysicalDisplayInfo` value obtained from `dumpsys` command.

        Returns:
            physical display info for dimension and density

        """
        phyDispRE = re.compile('.*PhysicalDisplayInfo{(?P<width>\d+) x (?P<height>\d+), .*, density (?P<density>[\d.]+).*')
        out = self.raw_shell('dumpsys display')
        m = phyDispRE.search(out)
        if m:
            displayInfo = {}
            for prop in ['width', 'height']:
                displayInfo[prop] = int(m.group(prop))
            for prop in ['density']:
                # In mPhysicalDisplayInfo density is already a factor, no need to calculate
                displayInfo[prop] = float(m.group(prop))
            return displayInfo

        # This could also be mSystem or mOverscanScreen
        phyDispRE = re.compile('\s*mUnrestrictedScreen=\((?P<x>\d+),(?P<y>\d+)\) (?P<width>\d+)x(?P<height>\d+)')
        # This is known to work on older versions (i.e. API 10) where mrestrictedScreen is not available
        dispWHRE = re.compile('\s*DisplayWidth=(?P<width>\d+) *DisplayHeight=(?P<height>\d+)')
        out = self.raw_shell('dumpsys window')
        m = phyDispRE.search(out, 0)
        if not m:
            m = dispWHRE.search(out, 0)
        if m:
            displayInfo = {}
            for prop in ['width', 'height']:
                displayInfo[prop] = int(m.group(prop))
            for prop in ['density']:
                d = self._getDisplayDensity(None, strip=True)
                if d:
                    displayInfo[prop] = d
                else:
                    # No available density information
                    displayInfo[prop] = -1.0
            return displayInfo

        # gets C{mPhysicalDisplayInfo} values from dumpsys. This is a method to obtain display dimensions and density
        phyDispRE = re.compile('Physical size: (?P<width>\d+)x(?P<height>\d+).*Physical density: (?P<density>\d+)', re.S)
        m = phyDispRE.search(self.raw_shell('wm size; wm density'))
        if m:
            displayInfo = {}
            for prop in ['width', 'height']:
                displayInfo[prop] = int(m.group(prop))
            for prop in ['density']:
                displayInfo[prop] = float(m.group(prop))
            return displayInfo

        return {}

    def _getDisplayDensity(self, key, strip=True):
        """
        Get display density

        Args:
            key:
            strip: strip the output

        Returns:
            display density

        """
        BASE_DPI = 160.0
        d = self.getprop('ro.sf.lcd_density', strip)
        if d:
            return float(d) / BASE_DPI
        d = self.getprop('qemu.sf.lcd_density', strip)
        if d:
            return float(d) / BASE_DPI
        return -1.0

    def getDisplayOrientation(self):
        """
        Another way to get the display orientation, this works well for older devices (SDK version 15)

        Returns:
            display orientation information

        """
        # another way to get orientation, for old sumsung device(sdk version 15) from xiaoma
        SurfaceFlingerRE = re.compile('orientation=(\d+)')
        output = self.shell('dumpsys SurfaceFlinger')
        m = SurfaceFlingerRE.search(output)
        if m:
            return int(m.group(1))

        # Fallback method to obtain the orientation
        # See https://github.com/dtmilano/AndroidViewClient/issues/128
        surfaceOrientationRE = re.compile('SurfaceOrientation:\s+(\d+)')
        output = self.shell('dumpsys input')
        m = surfaceOrientationRE.search(output)
        if m:
            return int(m.group(1))

        # We couldn't obtain the orientation
        warnings.warn("Could not obtain the orientation, return 0")
        return 0

    def get_top_activity(self):
        """
        Perform `adb shell dumpsys activity top` command search for the top activity

        Raises:
            AirtestError: if top activity cannot be obtained

        Returns:
            top activity as a tuple: (package_name, activity_name, pid)

        """
        dat = self.shell('dumpsys activity top')
        activityRE = re.compile(r'\s*ACTIVITY ([A-Za-z0-9_.$]+)/([A-Za-z0-9_.$]+) \w+ pid=(\d+)')
        # in Android8.0 or higher, the result may be more than one
        m = activityRE.findall(dat)
        if m:
            return m[-1]
        else:
            raise AirtestError("Can not get top activity, output:%s" % dat)

    def is_keyboard_shown(self):
        """
        Perform `adb shell dumpsys input_method` command and search for information if keyboard is shown

        Returns:
            True or False whether the keyboard is shown or not

        """
        dim = self.shell('dumpsys input_method')
        if dim:
            return "mInputShown=true" in dim
        return False

    def is_screenon(self):
        """
        Perform `adb shell dumpsys window policy` command and search for information if screen is turned on or off

        Raises:
            AirtestError: if screen state can't be detected

        Returns:
            True or False whether the screen is turned on or off

        """
        screenOnRE = re.compile('mScreenOnFully=(true|false)')
        m = screenOnRE.search(self.shell('dumpsys window policy'))
        if m:
            return m.group(1) == 'true'
        else:
            # MIUI11
            screenOnRE = re.compile('screenState=(SCREEN_STATE_ON|SCREEN_STATE_OFF)')
            m = screenOnRE.search(self.shell('dumpsys window policy'))
            if m:
                return m.group(1) == 'SCREEN_STATE_ON'
        raise AirtestError("Couldn't determine screen ON state")

    def is_locked(self):
        """
        Perform `adb shell dumpsys window policy` command and search for information if screen is locked or not

        Raises:
            AirtestError: if lock screen can't be detected

        Returns:
            True or False whether the screen is locked or not

        """
        lockScreenRE = re.compile('(?:mShowingLockscreen|isStatusBarKeyguard|showing)=(true|false)')
        m = lockScreenRE.search(self.shell('dumpsys window policy'))
        if not m:
            raise AirtestError("Couldn't determine screen lock state")
        return (m.group(1) == 'true')

    def unlock(self):
        """
        Perform `adb shell input keyevent MENU` and `adb shell input keyevent BACK` commands to attempt
        to unlock the screen

        Returns:
            None

        Warnings:
            Might not work on all devices

        """
        self.shell('input keyevent MENU')
        self.shell('input keyevent BACK')

    def get_package_version(self, package):
        """
        Perform `adb shell dumpsys package` and search for information about given package version

        Args:
            package: package name

        Returns:
            None if no info has been found, otherwise package version

        """
        package_info = self.shell(['dumpsys', 'package', package])
        matcher = re.search(r'versionCode=(\d+)', package_info)
        if matcher:
            return int(matcher.group(1))
        return None

    def list_app(self, third_only=False):
        """
        Perform `adb shell pm list packages` to print all packages, optionally only
          those whose package name contains the text in FILTER.

        Options
            -f: see their associated file
            -d: filter to only show disabled packages
            -e: filter to only show enabled packages
            -s: filter to only show system packages
            -3: filter to only show third party packages
            -i: see the installer for the packages
            -u: also include uninstalled packages


        Args:
            third_only: print only third party packages

        Returns:
            list of packages

        """
        cmd = ["pm", "list", "packages"]
        if third_only:
            cmd.append("-3")
        output = self.shell(cmd)
        packages = output.splitlines()
        # remove all empty string; "package:xxx" -> "xxx"
        packages = [p.split(":")[1] for p in packages if p]
        return packages

    def path_app(self, package):
        """
        Perform `adb shell pm path` command to print the path to the package

        Args:
            package: package name

        Raises:
            AdbShellError: if any adb error occurs
            AirtestError: if package is not found on the device

        Returns:
            path to the package

        """
        try:
            output = self.shell(['pm', 'path', package])
        except AdbShellError:
            output = ""
        if 'package:' not in output:
            raise AirtestError('package not found, output:[%s]' % output)
        return output.split("package:")[1].strip()

    def check_app(self, package):
        """
        Perform `adb shell dumpsys package` command and check if package exists on the device

        Args:
            package: package name

        Raises:
            AirtestError: if package is not found

        Returns:
            True if package has been found

        """
        output = self.shell(['dumpsys', 'package', package])
        pattern = r'Package\s+\[' + str(package) + '\]'
        match = re.search(pattern, output)
        if match is None:
            raise AirtestError('package "{}" not found'.format(package))
        return True

    def start_app(self, package, activity=None):
        """
        Perform `adb shell monkey` commands to start the application, if `activity` argument is `None`, then
        `adb shell am start` command is used.

        Args:
            package: package name
            activity: activity name

        Returns:
            None

        """
        if not activity:
            self.shell(['monkey', '-p', package, '-c', 'android.intent.category.LAUNCHER', '1'])
        else:
            self.shell(['am', 'start', '-n', '%s/%s.%s' % (package, package, activity)])

    def start_app_timing(self, package, activity):
        """
        Start the application and activity, and measure time

        Args:
            package: package name
            activity: activity name

        Returns:
            app launch time

        """
        out = self.shell(['am', 'start', '-S', '-W', '%s/%s' % (package, activity),
                          '-c', 'android.intent.category.LAUNCHER', '-a', 'android.intent.action.MAIN'])
        if not re.search(r"Status:\s*ok", out):
            raise AirtestError("Starting App: %s/%s Failed!" % (package, activity))

        # matcher = re.search(r"TotalTime:\s*(\d+)", out)
        matcher = re.search(r"TotalTime:\s*(\d+)", out)
        if matcher:
            return int(matcher.group(1))
        else:
            return 0

    def stop_app(self, package):
        """
        Perform `adb shell am force-stop` command to force stop the application

        Args:
            package: package name

        Returns:
            None

        """
        self.shell(['am', 'force-stop', package])

    def clear_app(self, package):
        """
        Perform `adb shell pm clear` command to clear all application data

        Args:
            package: package name

        Returns:
            None

        """
        self.shell(['pm', 'clear', package])

    def get_ip_address(self):
        """
        Perform several set of commands to obtain the IP address.

            * `adb shell netcfg | grep wlan0`
            * `adb shell ifconfig`
            * `adb getprop dhcp.wlan0.ipaddress`

        Returns:
            None if no IP address has been found, otherwise return the IP address

        """

        def get_ip_address_from_interface(interface):
            """Get device ip from target network interface."""
            # android >= 6.0: ip -f inet addr show {interface}
            try:
                res = self.shell('ip -f inet addr show {}'.format(interface))
            except AdbShellError:
                res = ''
            matcher = re.search(r"inet (\d+\.){3}\d+", res)
            if matcher:
                return matcher.group().split(" ")[-1]

            # android >= 6.0 backup method: ifconfig
            try:
                res = self.shell('ifconfig')
            except AdbShellError:
                res = ''
            matcher = re.search(interface + r'.*?inet addr:((\d+\.){3}\d+)', res, re.DOTALL)
            if matcher:
                return matcher.group(1)

            # android <= 6.0: netcfg
            try:
                res = self.shell('netcfg')
            except AdbShellError:
                res = ''
            matcher = re.search(interface + r'.* ((\d+\.){3}\d+)/\d+', res)
            if matcher:
                return matcher.group(1)

            # android <= 6.0 backup method: getprop dhcp.{}.ipaddress
            try:
                res = self.shell('getprop dhcp.{}.ipaddress'.format(interface))
            except AdbShellError:
                res = ''
            matcher = IP_PATTERN.search(res)
            if matcher:
                return matcher.group(0)

            # sorry, no more methods...
            return None

        interfaces = ('eth0', 'eth1', 'wlan0')
        for i in interfaces:
            ip = get_ip_address_from_interface(i)
            if ip and not ip.startswith('172.') and not ip.startswith('127.') and not ip.startswith('169.'):
                return ip

        return None

    def get_gateway_address(self):
        """
        Perform several set of commands to obtain the gateway address.
            * `adb getprop dhcp.wlan0.gateway`
            * `adb shell netcfg | grep wlan0`

        Returns:
            None if no gateway address has been found, otherwise return the gateway address

        """
        ip2int = lambda ip: reduce(lambda a, b: (a << 8) + b, map(int, ip.split('.')), 0)
        int2ip = lambda n: '.'.join([str(n >> (i << 3) & 0xFF) for i in range(0, 4)[::-1]])
        try:
            res = self.shell('getprop dhcp.wlan0.gateway')
        except AdbShellError:
            res = ''
        matcher = IP_PATTERN.search(res)
        if matcher:
            return matcher.group(0)
        ip = self.get_ip_address()
        if not ip:
            return None
        mask_len = self._get_subnet_mask_len()
        gateway = (ip2int(ip) & (((1 << mask_len) - 1) << (32 - mask_len))) + 1
        return int2ip(gateway)

    def _get_subnet_mask_len(self):
        """
        Perform `adb shell netcfg | grep wlan0` command to obtain mask length

        Returns:
            17 if mask length could not be detected, otherwise the mask length

        """
        try:
            res = self.shell('netcfg')
        except AdbShellError:
            pass
        else:
            matcher = re.search(r'wlan0.* (\d+\.){3}\d+/(\d+) ', res)
            if matcher:
                return int(matcher.group(2))
        # 获取不到网段长度就默认取17
        print('[iputils WARNING] fail to get subnet mask len. use 17 as default.')
        return 17

    def get_memory(self):
        res = self.shell("dumpsys meminfo")
        pat = re.compile(r".*Total RAM:\s+(\S+)\s+", re.DOTALL)
        _str = pat.match(res).group(1)
        if ',' in _str:
            _list = _str.split(',')
            _num = int(_list[0])
            _num = round(_num + (float(_list[1]) / 1000.0))
        else:
            _num = round(float(_str) / 1000.0 / 1000.0)
        res = str(_num) + 'G'
        return res

    def get_storage(self):
        res = self.shell("df /data")
        pat = re.compile(r".*\/data\s+(\S+)", re.DOTALL)
        if pat.match(res):
            _str = pat.match(res).group(1)
        else:
            pat = re.compile(r".*\s+(\S+)\s+\S+\s+\S+\s+\S+\s+\/data", re.DOTALL)
            _str = pat.match(res).group(1)
        if 'G' in _str:
            _num = round(float(_str[:-1]))
        elif 'M' in _str:
            _num = round(float(_str[:-1]) / 1000.0)
        else:
            _num = round(float(_str) / 1000.0 / 1000.0)
        if _num > 64:
            res = '128G'
        elif _num > 32:
            res = '64G'
        elif _num > 16:
            res = '32G'
        elif _num > 8:
            res = '16G'
        else:
            res = '8G'
        return res

    def get_cpuinfo(self):
        res = self.shell("cat /proc/cpuinfo").strip()
        cpuNum = res.count("processor")
        pat = re.compile(r'Hardware\s+:\s+(\w+.*)')
        m = pat.match(res)
        if not m:
            pat = re.compile(r'Processor\s+:\s+(\w+.*)')
            m = pat.match(res)
        cpuName = m.group(1).replace('\r', '')
        return dict(cpuNum=cpuNum, cpuName=cpuName)

    def get_cpufreq(self):
        res = self.shell("cat /sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq")
        num = round(float(res) / 1000 / 1000, 1)
        res = str(num) + 'GHz'
        return res.strip()

    def get_cpuabi(self):
        res = self.shell("getprop ro.product.cpu.abi")
        return res.strip()

    def get_gpu(self):
        res = self.shell("dumpsys SurfaceFlinger")
        pat = re.compile(r'GLES:\s+(.*)')
        m = pat.search(res)
        if not m:
            return None
        _list = m.group(1).split(',')
        gpuModel = ""
        opengl = ""
        if len(_list) > 0:
            gpuModel = _list[1].strip()
        if len(_list) > 1:
            m2 = re.search(r'(\S+\s+\S+\s+\S+).*', _list[2])
            if m2:
                opengl = m2.group(1)
        return dict(gpuModel=gpuModel, opengl=opengl)

    def get_model(self):
        return self.getprop("ro.product.model")

    def get_manufacturer(self):
        return self.getprop("ro.product.manufacturer")

    def get_device_info(self):
        """
        Get android device information, including: memory/storage/display/cpu/gpu/model/manufacturer...

        Returns:
            Dict of info

        """
        handlers = {
            "platform": "Android",
            "serialno": self.serialno,
            "memory": self.get_memory,
            "storage": self.get_storage,
            "display": self.getPhysicalDisplayInfo,
            "cpuinfo": self.get_cpuinfo,
            "cpufreq": self.get_cpufreq,
            "cpuabi": self.get_cpuabi,
            "sdkversion": self.sdk_version,
            "gpu": self.get_gpu,
            "model": self.get_model,
            "manufacturer": self.get_manufacturer,
            # "battery": getBatteryCapacity
        }
        ret = {}
        for k, v in handlers.items():
            if callable(v):
                try:
                    value = v()
                except Exception:
                    value = None
                ret[k] = value
            else:
                ret[k] = v
        return ret

    def get_display_of_all_screen(self, info):
        """
        Perform `adb shell dumpsys window windows` commands to get window display of application.

        Args:
            info: device screen properties

        Returns:
            None if adb command failed to run, otherwise return device screen properties

        """
        output = self.shell("dumpsys window windows")
        windows = output.split("Window #")
        offsetx, offsety, x, y = info['width'], info['height'], 0, 0
        package = self._search_for_current_package(output)
        if package:
            for w in windows:
                if "package=%s" % package in w:
                    arr = re.findall(r'Frames: containing=\[(\d+\.?\d*),(\d+\.?\d*)]\[(\d+\.?\d*),(\d+\.?\d*)]', w)
                    if len(arr) >= 1 and len(arr[0]) == 4:
                        offsetx, offsety, x, y = float(arr[0][0]), float(arr[0][1]), float(arr[0][2]), float(arr[0][3])
                        if info["orientation"] in [1, 3]:
                            offsetx, offsety, x, y = offsety, offsetx, y, x
                        x, y = x - offsetx, y - offsety
        return {
            "offset_x": offsetx,
            "offset_y": offsety,
            "offset_width": x,
            "offset_height": y
        }

    def _search_for_current_package(self, ret):
        """
        Search for current app package name from the output of command "adb shell dumpsys window windows"

        Returns:
            package name if exists else ""
        """
        try:
            packageRE = re.compile('\s*mCurrentFocus=Window{.* ([A-Za-z0-9_.]+)/[A-Za-z0-9_.]+}')
            m = packageRE.findall(ret)
            if m:
                return m[-1]
            else:
                return self.get_top_activity()[0]
        except Exception as e:
            print("[Error] Cannot get current top activity")
        return ""


def cleanup_adb_forward():
    for adb in ADB._instances:
        adb._cleanup_forwards()


reg_cleanup(cleanup_adb_forward)


# if __name__ == "__main__":

欢迎小伙伴关注微信公众号ID:gameTesterGz
或关注我的CSDN:https://blog.csdn.net/qq_32557025
谢谢各位的关注、点赞!
微信二维码

标签:shell,return,package,self,ADB,UI,AirTest,device,adb
来源: https://blog.csdn.net/qq_32557025/article/details/121838284

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有