diff --git a/airtest/aircv/screen_recorder.py b/airtest/aircv/screen_recorder.py index c9208ad4..c930a651 100644 --- a/airtest/aircv/screen_recorder.py +++ b/airtest/aircv/screen_recorder.py @@ -9,6 +9,7 @@ import time import numpy as np import subprocess +import traceback RECORDER_ORI = { @@ -102,9 +103,16 @@ def write(self, frame): self.writer.write(frame.astype(np.uint8)) def close(self): - self.writer.close() - self.process.wait() - self.process.terminate() + try: + self.writer.close() + self.process.wait(timeout=5) + except Exception as e: + print(f"Error closing ffmpeg process: {e}") + finally: + try: + self.process.terminate() + except Exception as e: + print(f"Error terminating ffmpeg process: {e}") class ScreenRecorder: @@ -160,6 +168,7 @@ def stop(self): self._stop_flag = True self.t_write.join() self.t_stream.join() + self.writer.close() # Ensure writer is closed def get_frame_loop(self): # 单独一个线程持续截图 @@ -177,7 +186,6 @@ def get_frame_loop(self): raise def write_frame_loop(self): - # 按帧率间隔获取图像写入视频 try: duration = 1.0/self.writer.fps last_time = time.time() @@ -185,7 +193,11 @@ def write_frame_loop(self): while True: if time.time()-last_time >= duration: last_time += duration - self.writer.write(self.tmp_frame) + try: + self.writer.write(self.tmp_frame) + except Exception as e: + print(f"Error writing frame: {e}") + break if self.is_stop(): break time.sleep(0.0001) @@ -194,4 +206,5 @@ def write_frame_loop(self): except Exception as e: print("write thread error", e) self._stop_flag = True + self.writer.close() # Ensure the writer is closed on error raise diff --git a/airtest/core/android/adb.py b/airtest/core/android/adb.py index 849d70c7..f0cc006e 100644 --- a/airtest/core/android/adb.py +++ b/airtest/core/android/adb.py @@ -25,6 +25,7 @@ from airtest.utils.snippet import get_std_encoding, reg_cleanup, split_cmd, make_file_executable LOGGING = get_logger(__name__) +TMP_PATH = "/data/local/tmp" # Android's temporary file directory class ADB(object): @@ -470,16 +471,10 @@ def sdk_version(self): def push(self, local, remote): """ - Perform `adb push` command - - Note: - If there is a space (or special symbol) in the file name, it will be forced to add escape characters, - and the new file name will be added with quotation marks and returned as the return value - - 注意:文件名中如果带有空格(或特殊符号),将会被强制增加转义符,并将新的文件名添加引号,作为返回值返回 + Push file or folder to the specified directory to the device Args: - local: local file to be copied to the device + local: local file or folder to be copied to the device remote: destination on the device where the file will be copied Returns: @@ -495,18 +490,65 @@ def push(self, local, remote): "/data/local/tmp/test\ space.txt" >>> adb.shell("rm " + new_name) + >>> adb.push("test_dir", "/sdcard/Android/data/com.test.package/files") + >>> adb.push("test_dir", "/sdcard/Android/data/com.test.package/files/test_dir") + """ - local = decode_path(local) # py2 - if os.path.isfile(local) and os.path.splitext(local)[-1] != os.path.splitext(remote)[-1]: - # If remote is a folder, add the filename and escape - filename = os.path.basename(local) - # Add escape characters for spaces, parentheses, etc. in filenames - filename = re.sub(r"[ \(\)\&]", lambda m: "\\" + m.group(0), filename) - remote = '%s/%s' % (remote, filename) - self.cmd(["push", local, remote], ensure_unicode=False) - return '\"%s\"' % remote + _, ext = os.path.splitext(remote) + if ext: + # The target path is a file + dst_parent = os.path.dirname(remote) + else: + dst_parent = remote - def pull(self, remote, local): + # If the target file already exists, delete it first to avoid overwrite failure + src_filename = os.path.basename(local) + _, src_ext = os.path.splitext(local) + if src_ext: + dst_path = f"{dst_parent}/{src_filename}" + else: + if src_filename == os.path.basename(remote): + dst_path = remote + else: + dst_path = f"{dst_parent}/{src_filename}" + try: + self.shell(f"rm -r {dst_path}") + except: + pass + + # If the target folder has multiple levels that have never been created, try to create them + try: + self.shell(f"mkdir -p {dst_parent}") + except: + pass + + # Push the file to the tmp directory to avoid permission issues + tmp_path = f"{TMP_PATH}/{src_filename}" + try: + self.cmd(["push", local, tmp_path]) + except: + self.cmd(["push", local, dst_parent]) + else: + try: + if src_ext: + try: + self.shell(f'mv "{tmp_path}" "{remote}"') + except: + self.shell(f'mv "{tmp_path}" "{remote}"') + else: + try: + self.shell(f'cp -frp "{tmp_path}/*" "{remote}"') + except: + self.shell(f'mv "{tmp_path}" "{remote}"') + finally: + try: + if TMP_PATH != dst_parent: + self.shell(f'rm -r "{tmp_path}"') + except: + pass + return dst_path + + def pull(self, remote, local=""): """ Perform `adb pull` command @@ -521,6 +563,8 @@ def pull(self, remote, local): Returns: None """ + if not local: + local = os.path.basename(remote) local = decode_path(local) # py2 if PY3: # If python3, use Path to force / convert to \ @@ -921,8 +965,8 @@ def exists_file(self, filepath): """ try: - out = self.shell(["ls", filepath]) - except AdbShellError: + out = self.shell("ls \"%s\"" % filepath) + except (AdbShellError, AdbError): return False else: return not ("No such file or directory" in out) diff --git a/airtest/core/android/android.py b/airtest/core/android/android.py index 30028414..b7ad41ab 100644 --- a/airtest/core/android/android.py +++ b/airtest/core/android/android.py @@ -1030,6 +1030,41 @@ def set_clipboard(self, text): """ self.yosemite_ext.set_clipboard(text) + def push(self, local, remote): + """ + Push file to the device + + Args: + local: local file or folder to be copied to the device + remote: destination on the device where the file will be copied + + Returns: + The file path saved in the phone may be enclosed in quotation marks, eg. '"test\ file.txt"' + + Examples: + >>> dev = connect_device("android:///") + >>> dev.push("test.txt", "/sdcard/test.txt") + + """ + return self.adb.push(local, remote) + + def pull(self, remote, local=""): + """ + Pull file from the device + + Args: + remote: remote file to be downloaded from the device + local: local destination where the file will be downloaded from the device, if not specified, the current directory is used + + Returns: + None + + Examples: + >>> dev = connect_device("android:///") + >>> dev.pull("/sdcard/test.txt", "rename.txt") + """ + return self.adb.pull(remote, local=local) + def _register_rotation_watcher(self): """ Register callbacks for Android and minicap when rotation of screen has changed diff --git a/airtest/core/api.py b/airtest/core/api.py index a2e4a8bc..b38d9572 100644 --- a/airtest/core/api.py +++ b/airtest/core/api.py @@ -724,6 +724,58 @@ def paste(*args, **kwargs): G.DEVICE.paste(*args, **kwargs) +@logwrap +def push(local, remote, *args, **kwargs): + """ + Push file from local to remote + + :param local: local file path + :param remote: remote file path + :return: filename of the pushed file + :platforms: Android, iOS + :Example: + + Android:: + + >>> connect_device("android:///") + >>> push(r"D:\demo\test.text", "/data/local/tmp/test.text") + + + iOS:: + + >>> connect_device("iOS:///http+usbmux://udid") + >>> push("test.png", "/DCIM/") # push to the DCIM directory + >>> push(r"D:\demo\test.text", "/Documents", bundle_id="com.apple.Keynote") # push to the Documents directory of the Keynote app + + """ + return G.DEVICE.push(local, remote, *args, **kwargs) + + +@logwrap +def pull(remote, local, *args, **kwargs): + """ + Pull file from remote to local + + :param remote: remote file path + :param local: local file path + :return: filename of the pulled file + :platforms: Android, iOS + :Example: + + Android:: + + >>> connect_device("android:///") + >>> pull("/data/local/tmp/test.txt", r"D:\demo\test.txt") + + iOS:: + + >>> connect_device("iOS:///http+usbmux://udid") + >>> pull("/DCIM/test.png", r"D:\demo\test.png") + >>> pull("/Documents/test.key", r"D:\demo\test.key", bundle_id="com.apple.Keynote") + + """ + return G.DEVICE.pull(remote, local, *args, **kwargs) + """ Assertions: see airtest/core/assertions.py """ diff --git a/airtest/core/ios/ios.py b/airtest/core/ios/ios.py index 7e69ee85..4bf5fe9b 100644 --- a/airtest/core/ios/ios.py +++ b/airtest/core/ios/ios.py @@ -11,6 +11,7 @@ import base64 import inspect import logging +import pathlib import traceback from logzero import setup_logger from functools import wraps @@ -31,7 +32,7 @@ from airtest.core.settings import Settings as ST from airtest.aircv.screen_recorder import ScreenRecorder, resize_by_max, get_max_size from airtest.core.error import LocalDeviceError, AirtestError - +from airtest.core.helper import logwrap LOGGING = get_logger(__name__) @@ -45,6 +46,7 @@ def decorator_retry_session(func): 当因为session失效而操作失败时,尝试重新获取session,最多重试3次。 """ + @wraps(func) def wrapper(self, *args, **kwargs): try: @@ -57,7 +59,8 @@ def wrapper(self, *args, **kwargs): except: time.sleep(0.5) continue - raise + raise AirtestError("Failed to re-acquire session.") + return wrapper @@ -65,13 +68,19 @@ def decorator_pairing_dialog(func): """ When the device is not paired, trigger the trust dialogue and try again. """ + @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except MuxError: - LOGGING.error("Device is not yet paired. Triggered the trust dialogue. Please accept and try again." + "(iTunes is required on Windows.) " if sys.platform.startswith("win") else "") + if sys.platform.startswith("win"): + error_msg = "Device is not yet paired. Triggered the trust dialogue. Please accept and try again. (iTunes is required on Windows.) " + else: + error_msg = "Device is not yet paired. Triggered the trust dialogue. Please accept and try again." + LOGGING.error(error_msg) raise + return wrapper @@ -85,6 +94,7 @@ def add_decorator_to_methods(decorator): Returns: - decorator_wrapper: A function that takes a class as input and decorates all the methods of the class by applying the input decorator to each method. """ + def decorator_wrapper(cls): # 获取要装饰的类的所有方法 methods = [attr for attr in dir(cls) if callable(getattr(cls, attr)) and not attr.startswith("_")] @@ -94,9 +104,24 @@ def decorator_wrapper(cls): setattr(cls, method, decorator(getattr(cls, method))) return cls + return decorator_wrapper +def format_file_list(file_list): + formatted_list = [] + for file in file_list: + file_info = { + 'type': 'Directory' if file[0] == 'd' else 'File', + 'size': file[1], + 'last_modified': file[2].strftime('%Y-%m-%d %H:%M:%S'), + 'name': file[3] + } + formatted_list.append(file_info) + + return formatted_list + + @add_decorator_to_methods(decorator_pairing_dialog) class TIDevice: """Below staticmethods are provided by Tidevice. @@ -108,11 +133,11 @@ def devices(): Get all available devices connected by USB, return a list of UDIDs. Returns: - list: A list of UDIDs. + list: A list of UDIDs. e.g. ['539c5fffb18f2be0bf7f771d68f7c327fb68d2d9'] """ return Usbmux().device_udid_list() - + @staticmethod def list_app(udid, app_type="user"): """ @@ -158,7 +183,7 @@ def list_wda(udid): if (bundle_id.startswith('com.') and bundle_id.endswith(".xctrunner")) or display_name == "WebDriverAgentRunner-Runner": wda_list.append(bundle_id) return wda_list - + @staticmethod def device_info(udid): """ @@ -191,8 +216,8 @@ def device_info(udid): 'BasebandVersion' """ for attr in ('ProductVersion', 'ProductType', - 'ModelNumber', 'SerialNumber', 'PhoneNumber', - 'TimeZone', 'UniqueDeviceID'): + 'ModelNumber', 'SerialNumber', 'PhoneNumber', + 'TimeZone', 'UniqueDeviceID'): key = attr[0].lower() + attr[1:] if attr in device_info: tmp_dict[key] = device_info[attr] @@ -201,7 +226,7 @@ def device_info(udid): except: tmp_dict["marketName"] = "" return tmp_dict - + @staticmethod def install_app(udid, file_or_url): BaseDevice(udid, Usbmux()).app_install(file_or_url) @@ -246,7 +271,7 @@ def ps(udid): continue ps_list.append({key: p[key] for key in keys}) return ps_list - + @staticmethod def ps_wda(udid): """Get all running WDA on device that meet certain naming rules. @@ -266,10 +291,308 @@ def ps_wda(udid): else: continue return ps_wda_list - + @staticmethod def xctest(udid, wda_bundle_id): - return BaseDevice(udid, Usbmux()).xctest(fuzzy_bundle_id=wda_bundle_id, logger=setup_logger(level=logging.INFO)) + try: + return BaseDevice(udid, Usbmux()).xctest(fuzzy_bundle_id=wda_bundle_id, + logger=setup_logger(level=logging.INFO)) + except Exception as e: + print( + f"Failed to run tidevice xctest function for {wda_bundle_id}.Try to run tidevice runwda function for {wda_bundle_id}.") + try: + return BaseDevice(udid, Usbmux()).runwda(fuzzy_bundle_id=wda_bundle_id) + except Exception as e: + print(f"Failed to run tidevice runwda function for {wda_bundle_id}.") + # 先不抛出异常,ios17的兼容未合并进来,ios17设备一定会报错 + # raise AirtestError(f"Failed to start XCTest for {wda_bundle_id}.") + + @staticmethod + def push(udid, local_path, device_path, bundle_id=None, timeout=None): + """ + Pushes a file or a directory from the local machine to the iOS device. + + Args: + udid (str): The UDID of the iOS device. + device_path (str): The directory path on the iOS device where the file or directory will be pushed. + local_path (str): The local path of the file or directory to be pushed. + bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be pushed to the app's sandbox container. Defaults to None. + timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + + Examples: + + Push a file to the DCIM directory:: + + >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/Pictures/photo.jpg", "/DCIM") + >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/Pictures/photo.jpg", "/DCIM/photo.jpg") + + Push a directory to the Documents directory of the Keynote app:: + + >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/test.key", "/Documents", "com.apple.Keynote") + >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/test.key", "/Documents/test.key", "com.apple.Keynote") + """ + try: + if not os.path.exists(local_path): + raise AirtestError(f"Local path {local_path} does not exist.") + + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + if device_path.endswith("/") or device_path.endswith("\\"): + device_path = device_path[:-1] + + if os.path.isfile(local_path): + file_name = os.path.basename(local_path) + # 如果device_path有后缀则认为是文件,和本地文件名不一样视为需要重命名 + if not os.path.splitext(device_path)[1]: + if os.path.basename(device_path) != file_name: + device_path = os.path.join(device_path, file_name) + device_path = device_path.replace("\\", "/") + # Create the directory if it does not exist + sync.mkdir(os.path.dirname(device_path)) + + with open(local_path, "rb") as f: + content = f.read() + sync.push_content(device_path, content) + elif os.path.isdir(local_path): + device_path = os.path.join(device_path, os.path.basename(local_path)) + device_path = device_path.replace("\\", "/") + sync.mkdir(device_path) + for root, dirs, files in os.walk(local_path): + # 创建文件夹 + for directory in dirs: + dir_path = os.path.join(root, directory) + relative_dir_path = os.path.relpath(dir_path, local_path) + device_dir_path = os.path.join(device_path, relative_dir_path) + device_dir_path = device_dir_path.replace("\\", "/") + sync.mkdir(device_dir_path) + # 上传文件 + for file_name in files: + file_path = os.path.join(root, file_name) + relative_path = os.path.relpath(file_path, local_path) + device_file_path = os.path.join(device_path, relative_path) + device_file_path = device_file_path.replace("\\", "/") + with open(file_path, "rb") as f: + content = f.read() + sync.push_content(device_file_path, content) + print(f"pushed {local_path} to {device_path}") + except Exception as e: + raise AirtestError( + f"Failed to push {local_path} to {device_path}. If push a FILE, please check if there is a DIRECTORY with the same name already exists. If push a DIRECTORY, please check if there is a FILE with the same name already exists, and try again.") + + @staticmethod + def pull(udid, device_path, local_path, bundle_id=None, timeout=None): + """ + Pulls a file or directory from the iOS device to the local machine. + + Args: + udid (str): The UDID of the iOS device. + device_path (str): The path of the file or directory on the iOS device. + Remote devices can only be file paths. + local_path (str): The destination path on the local machine. + Remote devices can only be file paths. + bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be pulled from the app's sandbox. Defaults to None. + timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + + Examples: + + Pull a file from the DCIM directory:: + + >>> TIDevice.pull("00008020-001270842E88002E", "/DCIM/photo.jpg", "C:/Users/username/Pictures/photo.jpg") + >>> TIDevice.pull("00008020-001270842E88002E", "/DCIM/photo.jpg", "C:/Users/username/Pictures") + + Pull a directory from the Documents directory of the Keynote app:: + + >>> TIDevice.pull("00008020-001270842E88002E", "/Documents", "C:/Users/username/Documents", "com.apple.Keynote") + >>> TIDevice.pull("00008020-001270842E88002E", "/Documents", "C:/Users/username/Documents", "com.apple.Keynote") + + """ + try: + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + if TIDevice.is_dir(udid, device_path, bundle_id): + os.makedirs(local_path, exist_ok=True) + + src = pathlib.Path(device_path) + dst = pathlib.Path(local_path) + if dst.is_dir() and src.name and sync.stat(src).is_dir(): + dst = dst.joinpath(src.name) + + sync.pull(src, dst) + print("pulled", src, "->", dst) + except Exception as e: + raise AirtestError(f"Failed to pull {device_path} to {local_path}.") + + @staticmethod + def rm(udid, remote_path, bundle_id=None): + """ + Removes a file or directory from the iOS device. + + Args: + udid (str): The UDID of the iOS device. + remote_path (str): The path of the file or directory on the iOS device. + bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be removed from the app's sandbox. Defaults to None. + + Examples: + Remove a file from the DCIM directory:: + + >>> TIDevice.rm("00008020-001270842E88002E", "/DCIM/photo.jpg") + >>> TIDevice.rm("00008020-001270842E88002E", "/DCIM/photo.jpg", "com.apple.Photos") + + Remove a directory from the Documents directory of the Keynote app:: + + >>> TIDevice.rm("00008020-001270842E88002E", "/Documents", "com.apple.Keynote") + """ + + def _check_status(status, path): + if status == 0: + print("removed", path) + else: + raise AirtestError(f"<{status.name} {status.value}> Failed to remove {path}") + + def _remove_folder(udid, folder_path, bundle_id): + folder_path = folder_path.replace("\\", "/") + for file_info in TIDevice.ls(udid, folder_path, bundle_id): + if file_info['type'] == 'Directory': + _remove_folder(udid, os.path.join(folder_path, file_info['name']), bundle_id) + else: + status = sync.remove(os.path.join(folder_path, file_info['name'])) + _check_status(status, os.path.join(folder_path, file_info['name'])) + # remove the folder itself + status = sync.remove(folder_path) + _check_status(status, folder_path) + + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + if TIDevice.is_dir(udid, remote_path, bundle_id): + if not remote_path.endswith("/"): + remote_path += "/" + _remove_folder(udid, remote_path, bundle_id) + else: + status = sync.remove(remote_path) + _check_status(status, remote_path) + + @staticmethod + def ls(udid, remote_path, bundle_id=None): + """ + List files and directories in the specified path on the iOS device. + + Args: + udid (str): The UDID of the iOS device. + remote_path (str): The path on the iOS device. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + Returns: + list: A list of files and directories in the specified path. + + Examples: + + List files and directories in the DCIM directory:: + + >>> print(TIDevice.ls("00008020-001270842E88002E", "/DCIM")) + [{'type': 'Directory', 'size': 96, 'last_modified': '2021-12-01 15:30:13', 'name': '100APPLE/'}, {'type': 'Directory', 'size': 96, 'last_modified': '2021-07-20 17:29:01', 'name': '.MISC/'}] + + List files and directories in the Documents directory of the Keynote app:: + + >>> print(TIDevice.ls("00008020-001270842E88002E", "/Documents", "com.apple.Keynote")) + [{'type': 'File', 'size': 302626, 'last_modified': '2024-06-25 11:25:25', 'name': '演示文稿.key'}] + """ + try: + file_list = [] + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + if remote_path.endswith("/") or remote_path.endswith("\\"): + remote_path = remote_path[:-1] + for file_info in sync.listdir_info(remote_path): + filename = file_info.st_name + if file_info.is_dir(): + filename = filename + "/" + file_list.append(['d' if file_info.is_dir() else '-', file_info.st_size, file_info.st_mtime, filename]) + file_list = format_file_list(file_list) + return file_list + except Exception as e: + raise AirtestError(f"Failed to list files and directories in {remote_path}.") + + @staticmethod + def mkdir(udid, remote_path, bundle_id=None): + """ + Create a directory on the iOS device. + + Args: + udid (str): The UDID of the iOS device. + remote_path (str): The path of the directory to be created on the iOS device. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + Examples: + Create a directory in the DCIM directory:: + + >>> TIDevice.mkdir("00008020-001270842E88002E", "/DCIM/test") + + Create a directory in the Documents directory of the Keynote app:: + + >>> TIDevice.mkdir("00008020-001270842E88002E", "/Documents/test", "com.apple.Keynote") + + """ + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + status = sync.mkdir(remote_path) + if int(status) == 0: + print("created", remote_path) + else: + raise AirtestError(f"<{status.name} {status.value}> Failed to create directory {remote_path}") + + @staticmethod + def is_dir(udid, remote_path, bundle_id): + """ + Check if the specified path on the iOS device is a directory. + + Args: + udid (str): The UDID of the iOS device. + remote_path (str): The path on the iOS device. + bundle_id (str): The bundle ID of the app. + + Returns: + bool: True if the path is a directory, False otherwise. + + Examples: + Check if the DCIM directory is a directory:: + + >>> TIDevice.is_dir("00008020-001270842E88002E", "/DCIM") + True + + Check if the Documents directory of the Keynote app is a directory:: + + >>> TIDevice.is_dir("00008020-001270842E88002E", "/Documents", "com.apple.Keynote") + True + >>> TIDevice.is_dir("00008020-001270842E88002E", "/Documents/test.key", "com.apple.Keynote") + False + """ + try: + remote_path = remote_path.rstrip("\\/") + remote_path_dir, remote_path_base = os.path.split(remote_path) + file_info = TIDevice.ls(udid, remote_path_dir, bundle_id) + for info in file_info: + # Remove the trailing slash. + if info['name'].endswith("/"): + info['name'] = info['name'][:-1] + if info['name'] == f"{remote_path_base}": + return info['type'] == 'Directory' + except Exception as e: + raise AirtestError( + f"Failed to check if {remote_path} is a directory. Please check the path exist and try again.") @add_decorator_to_methods(decorator_retry_session) @@ -283,7 +606,8 @@ class IOS(Device): - ``iproxy $port 8100 $udid`` """ - def __init__(self, addr=DEFAULT_ADDR, cap_method=CAP_METHOD.MJPEG, mjpeg_port=None, udid=None, name=None, serialno=None, wda_bundle_id=None): + def __init__(self, addr=DEFAULT_ADDR, cap_method=CAP_METHOD.MJPEG, mjpeg_port=None, udid=None, name=None, + serialno=None, wda_bundle_id=None): super().__init__() # If none or empty, use default addr. @@ -309,12 +633,13 @@ def __init__(self, addr=DEFAULT_ADDR, cap_method=CAP_METHOD.MJPEG, mjpeg_port=No # e.g., connect local device http://127.0.0.1:8100 or http://localhost:8100 or http+usbmux://00008020-001270842E88002E self.udid = udid or name or serialno self._wda_bundle_id = wda_bundle_id - parsed = urlparse(self.addr).netloc.split(":")[0] if ":" in urlparse(self.addr).netloc else urlparse(self.addr).netloc + parsed = urlparse(self.addr).netloc.split(":")[0] if ":" in urlparse(self.addr).netloc else urlparse( + self.addr).netloc if parsed not in ["localhost", "127.0.0.1"] and "." in parsed: # Connect remote device via url. self.is_local_device = False - self.driver = wda.Client(self.addr) - else: + self.driver = wda.Client(self.addr) + else: # Connect local device via url. self.is_local_device = True if parsed in ["localhost", "127.0.0.1"]: @@ -357,25 +682,25 @@ def _get_default_device(self): if device_udid_list: return device_udid_list[0] raise IndexError("iOS devices not found, please connect device first.") - + def _get_default_wda_bundle_id(self): """Get local default device's WDA bundleID when no WDA bundleID. Returns: Local device's WDA bundleID. - """ + """ try: wda_list = TIDevice.list_wda(self.udid) return wda_list[0] except IndexError: raise IndexError("WDA bundleID not found, please install WDA on device.") - + def _get_default_running_wda_bundle_id(self): """Get the bundleID of the WDA that is currently running on local device. Returns: Local device's running WDA bundleID. - """ + """ try: running_wda_list = TIDevice.ps_wda(self.udid) return running_wda_list[0] @@ -387,7 +712,7 @@ def wda_bundle_id(self): if not self._wda_bundle_id and self.is_local_device: self._wda_bundle_id = self._get_default_wda_bundle_id() return self._wda_bundle_id - + @property def ip(self): """Returns the IP address of the host connected to the iOS phone. @@ -500,7 +825,7 @@ def _register_rotation_watcher(self): def window_size(self): """ - Returns: + Returns: Window size (width, height). """ try: @@ -688,7 +1013,7 @@ def _quick_click(self, x, y, duration): """ The method extended from the facebook-wda third-party library. Use modified appium wda to perform quick click. - + Args: x, y (int, float): float(percent), int(coordicate) duration (optional): tap_hold duration @@ -696,12 +1021,17 @@ def _quick_click(self, x, y, duration): x, y = self.driver._percent2pos(x, y) data = {'x': x, 'y': y, 'duration': duration} # 为了兼容改动直接覆盖原生接口的自制版wda。 + try: - return self.driver._session_http.post('/wda/tap', data=data) + self.driver._session_http.post('/wda/deviceTap', data=data) + # 如果找不到接口说明是低版本的wda,低于1.3版本没有此接口 except wda.WDARequestError as e: - if e.status == 110: - self.driver.click(x, y, duration) - + try: + return self.driver._session_http.post('/wda/tap', data=data) + except wda.WDARequestError as e: + if e.status == 110: + self.driver.click(x, y, duration) + def double_click(self, pos): x, y = self._transform_xy(pos) self.driver.double_tap(x, y) @@ -727,6 +1057,7 @@ def swipe(self, fpos, tpos, duration=0, delay=None, *args, **kwargs): fx, fy = int(fx * self.touch_factor), int(fy * self.touch_factor) if not (tx < 1 and ty < 1): tx, ty = int(tx * self.touch_factor), int(ty * self.touch_factor) + # 如果是通过ide来滑动,且安装的是自制版的wda就调用快速滑动接口,其他时候不关心滑动速度就使用原生接口保证滑动精确性。 def ios_tagent_swipe(fpos, tpos, delay=None): # 调用自定义的wda swipe接口需要进行坐标转换。 @@ -754,7 +1085,7 @@ def _quick_swipe(self, x1, y1, x2, y2, delay): """ The method extended from the facebook-wda third-party library. Use modified appium wda to perform quick swipe. - + Args: x1, y1, x2, y2 (int, float): float(percent), int(coordicate) delay (float): start coordinate to end coordinate duration (seconds) @@ -767,7 +1098,12 @@ def _quick_swipe(self, x1, y1, x2, y2, delay): data = dict(fromX=x1, fromY=y1, toX=x2, toY=y2, delay=delay) # 为了兼容改动直接覆盖原生接口的自制版wda。 try: - return self.driver._session_http.post('/wda/swipe', data=data) + if self.using_ios_tagent: + try: + self.driver._session_http.post('/wda/deviceSwipe', data=data) + # 如果找不到接口说明是低版本的wda,低于1.3版本没有此接口 + except wda.WDARequestError as e: + return self.driver._session_http.post('/wda/swipe', data=data) except wda.WDARequestError as e: if e.status == 110: self.driver.swipe(x1, y1, x2, y2) @@ -805,7 +1141,7 @@ def text(self, text, enter=True): if enter: text += '\n' self.driver.send_keys(text) - + def install_app(self, file_or_url, **kwargs): """ curl -X POST $JSON_HEADER \ @@ -827,11 +1163,11 @@ def install_app(self, file_or_url, **kwargs): if not self.is_local_device: raise LocalDeviceError() return TIDevice.install_app(self.udid, file_or_url) - + def uninstall_app(self, bundle_id): """Uninstall app from the device. - Notes: + Notes: It seems always return True. Args: @@ -863,10 +1199,10 @@ def start_app(self, bundle_id, *args, **kwargs): raise AirtestError(f"App launch timeout, please check if the app is installed: {bundle_id}") else: return TIDevice.start_app(self.udid, bundle_id) - + def stop_app(self, bundle_id): """ - Note: Both ways of killing the app may fail, nothing responds or just closes the + Note: Both ways of killing the app may fail, nothing responds or just closes the app to the background instead of actually killing it and no error will be reported. """ try: @@ -927,13 +1263,13 @@ def app_current(self): "bundleId": "com.netease.cloudmusic"} """ return self.driver.app_current() - + def get_clipboard(self, wda_bundle_id=None, *args, **kwargs): """Get clipboard text. - Before calling the WDA interface, you need to ensure that WDA was foreground. + Before calling the WDA interface, you need to ensure that WDA was foreground. If there are multiple WDA on your device, please specify the active WDA by parameter wda_bundle_id. - + Args: wda_bundle_id: The bundle id of the running WDA, if None, will use default WDA bundle id. @@ -944,7 +1280,7 @@ def get_clipboard(self, wda_bundle_id=None, *args, **kwargs): LocalDeviceError: If the device is remote and the wda_bundle_id parameter is not provided. Notes: - If you want to use this function, you have to set WDA foreground which would switch the + If you want to use this function, you have to set WDA foreground which would switch the current screen of the phone. Then we will try to switch back to the screen before. """ if wda_bundle_id is None: @@ -973,7 +1309,7 @@ def get_clipboard(self, wda_bundle_id=None, *args, **kwargs): else: LOGGING.warning("we can't switch back to the app before, because can't get bundle id.") return decoded_text - + def set_clipboard(self, content, wda_bundle_id=None, *args, **kwargs): """ Set the clipboard content on the device. @@ -1168,7 +1504,7 @@ def alert_wait(self, time_counter=2): return self.driver.alert.wait(time_counter) def alert_buttons(self): - """Get alert buttons text. + """Get alert buttons text. Notes: Might not work on all devices. @@ -1201,7 +1537,7 @@ def alert_click(self, buttons): return self.driver.alert.click(buttons) def home_interface(self): - """Get True for the device status is on home interface. + """Get True for the device status is on home interface. Reason: Some devices can Horizontal screen on the home interface. @@ -1230,7 +1566,7 @@ def disconnect(self): self.mjpegcap.teardown_stream() if self.rotation_watcher: self.rotation_watcher.teardown() - + def start_recording(self, max_time=1800, output=None, fps=10, snapshot_sleep=0.001, orientation=0, max_size=None, *args, **kwargs): """Start recording the device display. @@ -1280,7 +1616,7 @@ def start_recording(self, max_time=1800, output=None, fps=10, if self.recorder and self.recorder.is_running(): LOGGING.warning("recording is already running, please don't call again") return None - + logdir = "./" if ST.LOG_DIR is not None: logdir = ST.LOG_DIR @@ -1293,10 +1629,11 @@ def start_recording(self, max_time=1800, output=None, fps=10, save_path = os.path.join(logdir, output) max_size = get_max_size(max_size) + def get_frame(): data = self.get_frame_from_stream() frame = aircv.utils.string_2_img(data) - + if max_size is not None: frame = resize_by_max(frame, max_size) return frame @@ -1309,9 +1646,175 @@ def get_frame(): LOGGING.info("start recording screen to {}".format(save_path)) return save_path - def stop_recording(self,): + def stop_recording(self, ): """ Stop recording the device display. Recoding file will be kept in the device. """ LOGGING.info("stopping recording") self.recorder.stop() return None + + def push(self, local_path, remote_path, bundle_id=None, timeout=None): + """ + Pushes a file from the local machine to the iOS device. + + Args: + remote_path (str): The path on the iOS device where the file will be saved. + local_path (str): The path of the file on the local machine. + bundle_id (str, optional): The bundle identifier of the app. Defaults to None. + timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + + Raises: + LocalDeviceError: If the device is remote. + + Examples: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> dev.push("test.png", "/DCIM/test.png") + >>> dev.push("test.png", "/DCIM/test_rename.png") + >>> dev.push("test.key", "/Documents/", "com.apple.Keynote") # Push to the Documents directory of the Keynote app + >>> dev.push("test.key", "/Documents/test.key", "com.apple.Keynote") + + Push file without suffix cannot be renamed, so the following code will push file to the path considered as a directory + >>> dev.push("test", "/Documents/test", "com.apple.Keynote") # The pushed file will be /Documents/test + >>> dev.push("test", "/Documents/test_rename", "com.apple.Keynote") # The pushed file will be /Documents/test_rename/test + + """ + if not self.is_local_device: + raise LocalDeviceError() + TIDevice.push(self.udid, local_path, remote_path, bundle_id=bundle_id) + + def pull(self, remote_path, local_path, bundle_id=None, timeout=None): + """ + Pulls a file or directory from the iOS device to the local machine. + + Args: + remote_path (str): The path of the file or directory on the iOS device. + local_path (str): The path where the file or directory will be saved on the local machine. + bundle_id (str, optional): The bundle identifier of the app. Defaults to None. Required for remote devices. + timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + + Raises: + LocalDeviceError: If the device is remote. + + Examples: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> dev.pull("/DCIM/test.png", "test.png") + >>> dev.pull("/Documents/test.key", "test.key", "com.apple.Keynote") + >>> dev.pull("/Documents/test.key", "dir/test.key", "com.apple.Keynote") + >>> dev.pull("/Documents/test.key", "test_rename.key", "com.apple.Keynote") + + """ + if not self.is_local_device: + raise LocalDeviceError() + TIDevice.pull(self.udid, remote_path, local_path, bundle_id=bundle_id, timeout=timeout) + + @logwrap + def ls(self, remote_path, bundle_id=None): + """ + List files and directories in the specified remote path on the iOS device. + + Args: + remote_path (str): The remote path to list. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. Required for remote devices. + + Returns: + list: A list of files and directories in the remote path. Each item in the list is a dictionary with the following keys: + - 'type': The type of the item. This can be 'Directory' or 'File'. + - 'size': The size of the item in bytes. + - 'last_modified': The last modification time of the item, in the format 'YYYY-MM-DD HH:MM:SS'. + - 'name': The name of the item, including the path relative to `remote_path`. + e.g. + [ + {'type': 'Directory', 'size': 1024, 'last_modified': 'YYYY-MM-DD HH:MM:SS', 'name': 'example_directory/'}, + {'type': 'File', 'size': 2048, 'last_modified': 'YYYY-MM-DD HH:MM:SS', 'name': 'example_file.txt'} + ] + + Raises: + LocalDeviceError: If the device is remote. + + Examples: + + List files and directories in the DCIM directory:: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> print(dev.ls("/DCIM/")) + [{'type': 'Directory', 'size': 96, 'last_modified': '2021-12-01 15:30:13', 'name': '100APPLE/'}, {'type': 'Directory', 'size': 96, 'last_modified': '2021-07-20 17:29:01', 'name': '.MISC/'}] + + List files and directories in the Documents directory of the Keynote app:: + + >>> print(dev.ls("/Documents", "com.apple.Keynote")) + [{'type': 'File', 'size': 302626, 'last_modified': '2024-06-25 11:25:25', 'name': 'test.key'}] + + """ + if not self.is_local_device: + raise LocalDeviceError() + return TIDevice.ls(self.udid, remote_path, bundle_id=bundle_id) + + @logwrap + def rm(self, remote_path, bundle_id=None): + """ + Remove a file or directory from the iOS device. + + Args: + remote_path (str): The remote path to remove. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + Raises: + LocalDeviceError: If the device is remote. + AirtestError: If the file or directory does not exist. + + Examples: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> dev.rm("/Documents/test.key", "com.apple.Keynote") + >>> dev.rm("/Documents/test_dir", "com.apple.Keynote") + + """ + if not self.is_local_device: + raise LocalDeviceError() + TIDevice.rm(self.udid, remote_path, bundle_id=bundle_id) + + @logwrap + def mkdir(self, remote_path, bundle_id=None): + """ + Create a directory on the iOS device. + + Args: + remote_path (str): The remote path to create. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + + Examples: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> dev.mkdir("/Documents/test_dir", "com.apple.Keynote") + + """ + if not self.is_local_device: + raise LocalDeviceError() + TIDevice.mkdir(self.udid, remote_path, bundle_id=bundle_id) + + def is_dir(self, remote_path, bundle_id=None): + """ + Check if the specified path on the iOS device is a directory. + + Args: + remote_path (str): The remote path to check. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + Returns: + bool: True if the path is a directory, False otherwise. + + Exapmles: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> print(dev.is_dir("/DCIM/")) + True + >>> print(dev.is_dir("/Documents/test.key", "com.apple.Keynote")) + False + + """ + if not self.is_local_device: + raise LocalDeviceError() + return TIDevice.is_dir(self.udid, remote_path, bundle_id=bundle_id) diff --git a/airtest/core/win/win.py b/airtest/core/win/win.py index eb2c92af..a48d5fdd 100644 --- a/airtest/core/win/win.py +++ b/airtest/core/win/win.py @@ -24,15 +24,18 @@ from airtest.core.settings import Settings as ST from airtest.utils.snippet import get_absolute_coordinate from airtest.utils.logger import get_logger +from airtest.core.win.screen import screenshot LOGGING = get_logger(__name__) + def require_app(func): @wraps(func) def wrapper(inst, *args, **kwargs): if not inst.app: raise RuntimeError("Connect to an application first to use %s" % func.__name__) return func(inst, *args, **kwargs) + return wrapper @@ -89,7 +92,12 @@ def connect(self, handle=None, **kwargs): self.app = self._app.connect(**kwargs) self._top_window = self.app.top_window().wrapper_object() if kwargs.get("foreground", True) in (True, "True", "true"): - self.set_foreground() + try: + self.set_foreground() + except pywintypes.error as e: + # pywintypes.error: (0, 'SetForegroundWindow', 'No error message is available') + # If you are not running with administrator privileges, it may fail, but this error can be ignored. + pass def shell(self, cmd): """ @@ -107,6 +115,40 @@ def shell(self, cmd): """ return subprocess.check_output(cmd, shell=True) + def snapshot_old(self, filename=None, quality=10, max_size=None): + """ + Take a screenshot and save it in ST.LOG_DIR folder + + Args: + filename: name of the file to give to the screenshot, {time}.jpg by default + quality: The image quality, integer in range [1, 99] + max_size: the maximum size of the picture, e.g 1200 + + Returns: + display the screenshot + + """ + if self.handle: + screen = screenshot(filename, self.handle) + else: + screen = screenshot(filename) + if self.app: + rect = self.get_rect() + rect = self._fix_image_rect(rect) + screen = aircv.crop_image(screen, [rect.left, rect.top, rect.right, rect.bottom]) + if not screen.any(): + if self.app: + rect = self.get_rect() + rect = self._fix_image_rect(rect) + screen = aircv.crop_image(screenshot(filename), [rect.left, rect.top, rect.right, rect.bottom]) + if self._focus_rect != (0, 0, 0, 0): + height, width = screen.shape[:2] + rect = (self._focus_rect[0], self._focus_rect[1], width + self._focus_rect[2], height + self._focus_rect[3]) + screen = aircv.crop_image(screen, rect) + if filename: + aircv.imwrite(filename, screen, quality, max_size=max_size) + return screen + def snapshot(self, filename=None, quality=10, max_size=None): """ Take a screenshot and save it in ST.LOG_DIR folder @@ -127,12 +169,16 @@ def snapshot(self, filename=None, quality=10, max_size=None): "height": rect.bottom - rect.top, "monitor": 1} else: monitor = self.screen.monitors[0] - with mss.mss() as sct: - sct_img = sct.grab(monitor) - screen = numpy.array(sct_img, dtype=numpy.uint8)[...,:3] - if filename: - aircv.imwrite(filename, screen, quality, max_size=max_size) - return screen + try: + with mss.mss() as sct: + sct_img = sct.grab(monitor) + screen = numpy.array(sct_img, dtype=numpy.uint8)[..., :3] + if filename: + aircv.imwrite(filename, screen, quality, max_size=max_size) + return screen + except: + # if mss.exception.ScreenShotError: gdi32.GetDIBits() failed. + return self.snapshot_old(filename, quality, max_size) def _fix_image_rect(self, rect): """Fix rect in image.""" @@ -261,19 +307,19 @@ def touch(self, pos, **kwargs): time.sleep(interval) for i in range(1, steps): - x = int(start_x + (end_x-start_x) * i / steps) - y = int(start_y + (end_y-start_y) * i / steps) + x = int(start_x + (end_x - start_x) * i / steps) + y = int(start_y + (end_y - start_y) * i / steps) self.mouse.move(coords=(x, y)) time.sleep(interval) self.mouse.move(coords=(end_x, end_y)) - for i in range(1, offset+1): - self.mouse.move(coords=(end_x+i, end_y+i)) + for i in range(1, offset + 1): + self.mouse.move(coords=(end_x + i, end_y + i)) time.sleep(0.01) for i in range(offset): - self.mouse.move(coords=(end_x+offset-i, end_y+offset-i)) + self.mouse.move(coords=(end_x + offset - i, end_y + offset - i)) time.sleep(0.01) self.mouse.press(button=button, coords=(end_x, end_y)) @@ -601,7 +647,6 @@ def get_ip_address(self): return None - def start_recording(self, max_time=1800, output=None, fps=10, snapshot_sleep=0.001, orientation=0, max_size=None, *args, **kwargs): """ @@ -651,12 +696,12 @@ def start_recording(self, max_time=1800, output=None, fps=10, if self.recorder.is_running(): LOGGING.warning("recording is already running, please don't call again") return None - + logdir = "./" if not ST.LOG_DIR is None: logdir = ST.LOG_DIR if output is None: - save_path = os.path.join(logdir, "screen_%s.mp4"%(time.strftime("%Y%m%d%H%M%S", time.localtime()))) + save_path = os.path.join(logdir, "screen_%s.mp4" % (time.strftime("%Y%m%d%H%M%S", time.localtime()))) else: if os.path.isabs(output): save_path = output @@ -664,9 +709,14 @@ def start_recording(self, max_time=1800, output=None, fps=10, save_path = os.path.join(logdir, output) max_size = get_max_size(max_size) + def get_frame(): - frame = self.snapshot() - + try: + frame = self.snapshot() + except numpy.core._exceptions._ArrayMemoryError: + self.stop_recording() + raise Exception("memory error!!!!") + if max_size is not None: frame = resize_by_max(frame, max_size) return frame @@ -679,7 +729,7 @@ def get_frame(): LOGGING.info("start recording screen to {}, don't close or resize the app window".format(save_path)) return save_path - def stop_recording(self,): + def stop_recording(self): """ Stop recording the device display. Recoding file will be kept in the device. diff --git a/airtest/utils/logwraper.py b/airtest/utils/logwraper.py index 6ee4a391..5e1d1c7d 100644 --- a/airtest/utils/logwraper.py +++ b/airtest/utils/logwraper.py @@ -9,6 +9,7 @@ from copy import copy from .logger import get_logger from .snippet import reg_cleanup +from airtest.core.error import LocalDeviceError LOGGING = get_logger(__name__) @@ -125,10 +126,15 @@ def func1(snapshot=True): # The snapshot parameter is popped from the function parameter, # so the function cannot use the parameter name snapshot later snapshot = m.pop('snapshot', False) + m.pop('self', None) # remove self from the call_args + m.pop('cls', None) # remove cls from the call_args fndata = {'name': f.__name__, 'call_args': m, 'start_time': start} logger.running_stack.append(fndata) try: res = f(*args, **kwargs) + except LocalDeviceError: + # 为了进入airtools中的远程方法,同时不让LocalDeviceError在报告中显示为失败步骤 + raise LocalDeviceError except Exception as e: data = {"traceback": traceback.format_exc(), "end_time": time.time()} fndata.update(data) diff --git a/airtest/utils/version.py b/airtest/utils/version.py index ce5aeff0..f1250baf 100644 --- a/airtest/utils/version.py +++ b/airtest/utils/version.py @@ -1,4 +1,4 @@ -__version__ = "1.3.3.1" +__version__ = "1.3.5" import os import sys diff --git a/requirements.txt b/requirements.txt index 5f93f495..99e315da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ Pillow>=3.4.0 requests>=2.11.1 six>=1.9.0,<=1.16.0 mss==6.1.0 -numpy +numpy<2.0 opencv-contrib-python>=4.4.0.46, <=4.6.0.66 facebook-wda>=1.3.3 pywinauto==0.6.3 diff --git a/setup.py b/setup.py index 4de5ece0..52ee3c6a 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,9 @@ def parse_requirements(filename): # if py<=3.6 add dataclasses if sys.version_info.major == 3 and sys.version_info.minor <= 6: reqs.append("dataclasses") + if sys.version_info.major == 3 and sys.version_info.minor <= 7: + reqs.remove("facebook-wda>=1.3.3") + reqs.append("facebook-wda<1.4.8") if is_docker(): reqs.remove("opencv-contrib-python>=4.4.0.46, <=4.6.0.66") reqs.append("opencv-contrib-python-headless==4.5.5.64") diff --git a/tests/test_adb.py b/tests/test_adb.py index fe6b39ca..826270f2 100644 --- a/tests/test_adb.py +++ b/tests/test_adb.py @@ -124,7 +124,7 @@ def test_push_file(file_path, des_path): print(des_file) self.assertIsNotNone(des_file) self.assertTrue(self.adb.exists_file(des_file)) - self.adb.shell("rm " + des_file) + self.adb.shell("rm -r \"" + des_file + "\"") tmpdir = "/data/local/tmp" test_push_file(IMG, tmpdir) @@ -132,6 +132,7 @@ def test_push_file(file_path, des_path): imgname = os.path.basename(IMG) tmpimgpath = tmpdir + "/" + imgname test_push_file(IMG, tmpimgpath) + test_push_file(IMG, tmpdir) # 测试空格+特殊字符+中文 test_space_img = os.path.join(os.path.dirname(IMG), "space " + imgname) @@ -146,6 +147,26 @@ def test_push_file(file_path, des_path): test_push_file(test_img, tmpdir + "/" + os.path.basename(test_img)) try_remove(test_img) + # 测试非临时目录(部分高版本手机有权限问题,不允许直接push) + dst_path = "/sdcard/Android/data/com.netease.nie.yosemite/files" + test_push_file(IMG, dst_path) + test_img = os.path.join(os.path.dirname(IMG), imgname + "中文 (1)") + shutil.copy(IMG, test_img) + test_push_file(test_img, dst_path) + + # 推送文件夹 /test push 到 目标路径 + os.makedirs("test push", exist_ok=True) + shutil.copy(IMG, "test push/" + imgname) + test_push_file("test push", dst_path) + test_push_file("test push", tmpdir) + shutil.rmtree("test push") + + # 推送文件夹 /test push 到 目标路径/test push + os.makedirs("test push", exist_ok=True) + shutil.copy(IMG, "test push/" + imgname) + test_push_file("test push", dst_path + "/test") + shutil.rmtree("test push") + def test_pull(self): tmpdir = "/data/local/tmp" imgname = os.path.basename(IMG) diff --git a/tests/test_ios.py b/tests/test_ios.py index eeebb612..a52deb95 100644 --- a/tests/test_ios.py +++ b/tests/test_ios.py @@ -1,5 +1,7 @@ # encoding=utf-8 +import hashlib import os +import shutil import time import unittest import numpy @@ -9,21 +11,26 @@ from .testconf import try_remove import cv2 import warnings +import tempfile + warnings.simplefilter("always") -text_flag = True # 控制是否运行text接口用例 +text_flag = True # 控制是否运行text接口用例 skip_alert_flag = False # 控制是否测试alert相关接口用例 DEFAULT_ADDR = "http://localhost:8100/" # iOS设备连接参数 PKG_SAFARI = "com.apple.mobilesafari" -TEST_IPA_FILE_OR_URL = "" # IPA包体的路径或者url链接,测试安装 -TEST_IPA_BUNDLE_ID = "" # IPA安装后app的bundleID,测试卸载 - -class TestIos(unittest.TestCase): +TEST_IPA_FILE_OR_URL = "" # IPA包体的路径或者url链接,测试安装 +TEST_IPA_BUNDLE_ID = "" # IPA安装后app的bundleID,测试卸载 + +class TestIos(unittest.TestCase): @classmethod def setUpClass(cls): - # cls.ios = IOS(addr=DEFAULT_ADDR, cap_method=CAP_METHOD.WDACAP) - cls.ios = connect_device("iOS:///http+usbmux://") + cls.ios = IOS(addr=DEFAULT_ADDR, cap_method=CAP_METHOD.WDACAP) + cls.TEST_FSYNC_APP = "" # 测试文件推送、同步的app的bundleID + # 获取一个可以用于文件操作的app + cls.TEST_FSYNC_APP = "com.apple.Keynote" + # cls.TEST_FSYNC_APP = "rn.notes.best" @classmethod def tearDownClass(cls): @@ -65,7 +72,7 @@ def test_snapshot(self): filename = "./screen.png" if os.path.exists(filename): os.remove(filename) - + screen = self.ios.snapshot(filename=filename) self.assertIsInstance(screen, numpy.ndarray) self.assertTrue(os.path.exists(filename)) @@ -81,11 +88,11 @@ def test_keyevent_home(self): with self.assertRaises(ValueError): self.ios.keyevent("home1") - + def test_keyevent_volume_up(self): print("test_keyevent_volume_up") self.ios.keyevent("voluMeup") - + def test_keyevent_volume_down(self): print("test_keyevent_volume_down") self.ios.keyevent("voluMeDown") @@ -191,7 +198,7 @@ def test_app_current(self): time.sleep(2) self.assertEqual(self.ios.app_current()["bundleId"], PKG_SAFARI) self.ios.stop_app(PKG_SAFARI) - + def test_get_ip_address(self): print("test_get_ip_address") print(self.ios.get_ip_address()) @@ -263,7 +270,7 @@ def test_touch_factor(self): print("display_info:", self.ios.display_info) print("default touch_factor:", self.ios.touch_factor) self.ios.touch((500, 500)) - self.ios.touch_factor = 1/3.3 + self.ios.touch_factor = 1 / 3.3 self.ios.touch((500, 500)) def test_disconnect(self): @@ -272,10 +279,10 @@ def test_disconnect(self): self.ios.get_frame_from_stream() self.ios.disconnect() self.assertEqual(len(self.ios.instruct_helper._port_using_func.keys()), 0) - + def test_record(self): self.ios.start_recording(output="test_10s.mp4") - time.sleep(10+4) + time.sleep(10 + 4) self.ios.stop_recording() time.sleep(2) self.assertEqual(os.path.exists("test_10s.mp4"), True) @@ -284,9 +291,9 @@ def test_record(self): if cap.isOpened(): rate = cap.get(5) frame_num = cap.get(7) - duration = frame_num/rate + duration = frame_num / rate self.assertEqual(duration >= 10, True) - + def test_list_app(self): print("test_list_app") app_list = self.ios.list_app(type="all") @@ -296,7 +303,7 @@ def test_list_app(self): def test_install_app(self): print("test_install_app") self.ios.install_app(TEST_IPA_FILE_OR_URL) - + def test_uninstall_app(self): print("test_uninstall_app") self.ios.uninstall_app(TEST_IPA_BUNDLE_ID) @@ -307,7 +314,7 @@ def test_get_clipboard(self): def test_set_clipboard(self): for i in range(10): - text = "test_set_clipboard"+str(i) + text = "test_set_clipboard" + str(i) self.ios.set_clipboard(text) self.assertEqual(self.ios.get_clipboard(), text) self.ios.paste() @@ -317,10 +324,358 @@ def test_set_clipboard(self): self.assertEqual(self.ios.get_clipboard(), text) self.ios.paste() + def test_ls(self): + print("test ls") + # ls /DCIM/ + dcim = self.ios.ls("/DCIM/") + print(dcim) + self.assertTrue(isinstance(dcim, list) and len(dcim) > 0) + self.assertTrue(isinstance(dcim[0], dict)) + + # ls app /Documents/ + with open("test_ls_file.txt", 'w') as f: + f.write('Test data') + self.ios.push("test_ls_file.txt", "/Documents/", self.TEST_FSYNC_APP) + file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + self.assertTrue(isinstance(file_list, list)) + self.assertTrue(len(file_list) > 0) + self.assertTrue(isinstance(file_list[0], dict)) + self._try_remove_ios("/Documents/test_ls_file.txt", self.TEST_FSYNC_APP) + try_remove("test_ls_file.txt") + + def _try_remove_ios(self, file_name, bundle_id=None): + try: + self.ios.rm(file_name, bundle_id) + file_list = self.ios.ls(os.path.dirname(file_name), bundle_id) + for file in file_list: + if file['name'] == file_name: + raise Exception(f"remove file {file_name} failed") + print(f"file {file_name} not exist now.") + except: + print(f"not find {file_name}") + pass + + def test_push(self): + def _test_file(file_name, dst="/Documents/", bundle_id=self.TEST_FSYNC_APP, target=None): + try_remove(file_name) + with open(file_name, 'w') as f: + f.write('Test data') + + # 用来ls和rm的路径,没有将文件改名则默认为file_name + if not target: + tmp_dst = os.path.normpath(dst) + if os.path.basename(tmp_dst) != file_name: + tmp_dst = os.path.join(tmp_dst, file_name) + target = tmp_dst.replace('\\', '/') + + # 清理手机里的文件 + self._try_remove_ios(target, bundle_id) + self.ios.push(file_name, dst, bundle_id, timeout=60) + time.sleep(1) + file_list = self.ios.ls(target, bundle_id) + # 验证结果 + self.assertEqual(len(file_list), 1) + self.assertEqual(file_list[0]['name'], os.path.basename(target)) + self.assertEqual(file_list[0]['type'], 'File') + self._try_remove_ios(target, bundle_id) + time.sleep(1) + + # 清理 + try_remove(file_name) + + def _test_dir(dir_name, dst="/Documents/"): + print(f"test push directory {dir_name}") + # 用来ls和rm的路径 + tmp_dst = os.path.normpath(dst) + if os.path.basename(tmp_dst) != dir_name: + tmp_dst = os.path.join(tmp_dst, dir_name) + target = tmp_dst.replace('\\', '/') + + # 创建文件夹和文件 + try_remove(dir_name) + self._try_remove_ios(target, self.TEST_FSYNC_APP) + os.mkdir(dir_name) + with open(f'{dir_name}/test_data', 'w') as f: + f.write('Test data') + + self.ios.push(dir_name, dst, self.TEST_FSYNC_APP, timeout=60) + time.sleep(1) + + dir_list = self.ios.ls(os.path.dirname(target), self.TEST_FSYNC_APP) + print(dir_list) + self.assertTrue(f"{dir_name}/" in [item['name'] for item in dir_list]) + file_list = self.ios.ls(f"{target}/test_data", self.TEST_FSYNC_APP) + self.assertTrue("test_data" in [item['name'] for item in file_list]) + self._try_remove_ios(target, self.TEST_FSYNC_APP) + time.sleep(1) + + try_remove(dir_name) + + # 执行得太快会报错,可能和wda的处理速度有关 + # 如果报错尝试单独执行那些用例 + _test_file("test_data_1.txt", "/Documents/") + _test_file("test_data_2.txt", "/Documents/test_data_2.txt") + _test_file("test_data_3.txt", "/Documents/test_data_3.txt/") + # 重命名文件 + _test_file("test_data_4.txt", "/Documents/test_data.txt/", target="/Documents/test_data.txt") + _test_file("test_data.txt", "/Documents") + _test_file("test_1.png", "/DCIM", None) + _test_file("test_2.png", "/DCIM/", None) + _test_file("test_3.png", "/DCIM/test_3.png", None) + _test_file("test_4.png", "/DCIM/test_4.png/", None) + _test_file("test_5.png", "/DCIM/test.png/", None, target="/DCIM/test.png") + _test_file("test.png", "/DCIM/", None) + _test_file("t e s t d a t a.txt", "/Documents") + _test_file("测试文件.txt", "/Documents") + _test_file("测 试 文 件.txt", "/Documents") + _test_file("(){}[]~'-_@!#$%&+,;=^.txt", "/Documents") + _test_file("data", "/Documents") + + _test_dir('test_dir', "/Documents/") + _test_dir('test_dir_1', "/Documents") + _test_dir('t e s t d i r', "/Documents") + _test_dir("(){}[]~'-_@!#$%&+,;=^", "/Documents") + _test_dir('测试文件夹', "/Documents/") + _test_dir('测试文件夹_1', "/Documents") + _test_dir('测 试 文 件 夹', "/Documents") + + def test_pull(self): + def _get_file_md5(file_path): + hasher = hashlib.md5() + with open(file_path, 'rb') as f: + while chunk := f.read(8192): + hasher.update(chunk) + return hasher.hexdigest() + + def _get_folder_md5(folder_path): + md5_list = [] + for root, _, files in os.walk(folder_path): + for file in sorted(files): # Sort to maintain order + file_path = os.path.join(root, file) + file_md5 = _get_file_md5(file_path) + md5_list.append(file_md5) + + combined_md5 = hashlib.md5("".join(md5_list).encode()).hexdigest() + return combined_md5 + + def _test_file(file_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{file_name}" + # 删除手机和本地存在的文件, + try_remove(file_name) + self._try_remove_ios(target, bundle_id=bundle_id) + + # 创建文件,推送文件 + with open(file_name, 'w') as f: + f.write('Test data') + md5 = _get_file_md5(file_name) + self.ios.push(file_name, f"{folder}/", bundle_id=bundle_id, timeout=60) + try_remove(file_name) + + # 下载文件 + print(f"test pull file {file_name}") + self.ios.pull(target, ".", bundle_id=bundle_id, timeout=60) + self.assertTrue(os.path.exists(file_name)) + self.assertEqual(md5, _get_file_md5(file_name)) + + # 下载、重命名文件 + self.ios.pull(target, "rename_file.txt", bundle_id=bundle_id, timeout=60) + self.assertTrue(os.path.exists("rename_file.txt")) + self.assertEqual(md5, _get_file_md5("rename_file.txt")) + + # 带文件夹路径 + os.mkdir("test_dir") + self.ios.pull(target, "test_dir", bundle_id=bundle_id, timeout=60) + self.assertTrue(os.path.exists(f"test_dir/{file_name}")) + self.assertEqual(md5, _get_file_md5(f"test_dir/{file_name}")) + + # 清理 + self._try_remove_ios(target, bundle_id) + try_remove(file_name) + try_remove("rename_file.txt") + try_remove("test_dir") + + def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{dir_name}" + # 删除手机和本地存在的文件夹,创建文件夹和文件 + try_remove(dir_name) + self._try_remove_ios(target, bundle_id=bundle_id) + os.mkdir(dir_name) + with open(f'{dir_name}/test_data', 'w') as f: + f.write('Test data') + md5 = _get_folder_md5(dir_name) + self.ios.push(dir_name, f"{folder}/", bundle_id=bundle_id, timeout=60) + time.sleep(1) + try_remove(dir_name) + + # 推送文件夹 + print(f"test pull directory {dir_name}") + os.mkdir(dir_name) + self.ios.pull(target, dir_name, bundle_id=bundle_id, timeout=60) + self.assertTrue(os.path.exists(f"{dir_name}/{dir_name}")) + self.assertEqual(md5, _get_folder_md5(f"{dir_name}/{dir_name}")) + + # 清理 + self._try_remove_ios(target, bundle_id=bundle_id) + time.sleep(1) + try_remove(dir_name) + + # 执行得太快会报错,可能和wda的处理速度有关 + # 如果报错尝试单独执行那些用例 + _test_file("test_data.txt") + _test_file("t e s t _ d a t a.txt") + _test_file("测试文件.txt") + _test_file("测 试 文 件.txt") + _test_file("(){}[]~'-_@!#$%&+,;=^.txt") + _test_file("data") + _test_file("data.png", bundle_id=None, folder="/DCIM") + + _test_dir('test_dir') + _test_dir('t e s t _ d i r') + _test_dir('测试文件夹') + _test_dir('测试文件夹.txt') + _test_dir('测 试 文 件 夹') + _test_dir("(){}[]~'-_@!#$%&+,;=^") + _test_dir('test_dir_no_bundle', bundle_id=None, folder="/DCIM") + + def test_rm(self): + def _test_file(file_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{file_name}" + + # 删除手机和本地存在的文件,创建文件 + self._try_remove_ios(target, bundle_id) + with open(file_name, 'w') as f: + f.write('Test data') + + # 推送文件 + self.ios.push(file_name, f"{folder}/", bundle_id, timeout=60) + time.sleep(1) + try_remove(file_name) + file_list = self.ios.ls(target, bundle_id) + self.assertEqual(len(file_list), 1) + + # 删除文件 + print(f"test rm file {file_name}") + self.ios.rm(target, bundle_id) + file_list = self.ios.ls(folder, bundle_id) + for item in file_list: + if item['name'] == file_name: + raise Exception(f"remove {file_name} failed") + + def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{dir_name}" + + # 删除手机和本地存在的文件夹,创建文件夹和文件 + self._try_remove_ios(target, bundle_id) + os.mkdir(dir_name) + with open(f'{dir_name}/test_data', 'w') as f: + f.write('Test data') + + # 推送文件夹 + self.ios.push(dir_name, f"{folder}/", bundle_id, timeout=60) + time.sleep(1) + try_remove(dir_name) + + print(f"test rm directory {dir_name}") + file_list = self.ios.ls(folder, bundle_id) + for item in file_list: + if item['name'] == f"{dir_name}/": + break + else: + raise Exception(f"directory {dir_name} not exist") + + # 删除文件夹 + self.ios.rm(target, bundle_id) + file_list = self.ios.ls(folder, bundle_id) + self.assertTrue(f"{dir_name}/" not in [item['name'] for item in file_list]) + + # 执行得太快会报错,可能和wda的处理速度有关 + # 如果报错尝试单独执行那些用例 + _test_file("test_data.txt") + _test_file("t e s t _ d a t a.txt") + _test_file("测试文件.txt") + _test_file("测 试 文 件.txt") + _test_file("(){}[]~'-_@!#$%&+,;=^.txt") + _test_file("data") + _test_file("data.png", bundle_id=None, folder="/DCIM") + + _test_dir('test_dir') + _test_dir('t e s t _ d i r') + _test_dir('测试文件夹') + _test_dir('测试文件夹.txt') + _test_dir('测 试 文 件 夹') + _test_dir("(){}[]~'-_@!#$%&+,;=^") + _test_dir('test_dir_no_bundle', bundle_id=None, folder="/DCIM") + + def test_mkdir(self): + def _test(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{dir_name}" + + # 删除目标目录 + self._try_remove_ios(target, bundle_id) + + print("test mkdir") + + # 创建目录 + self.ios.mkdir(target, bundle_id) + + # 获取目标文件夹下的目录列表 + dirs = self.ios.ls(folder, bundle_id) + + # 检查新建的目录是否存在 + self.assertTrue(any(d['name'] == f"{dir_name}/" for d in dirs)) + + # 删除目标目录 + self._try_remove_ios(target, bundle_id) + + # 执行得太快会报错,可能和wda的处理速度有关 + # 如果报错尝试单独执行那些用例 + _test('test_dir') + _test('t e s t _ d i r') + _test('测试文件夹') + _test('测试文件夹.txt') + _test('测 试 文 件 夹') + _test("(){}[]~'-_@!#$%&+,;=^") + _test('test_dir_no_bundle', bundle_id=None, folder="/DCIM") + + def test_is_dir(self): + print("test is_dir") + + def create_and_push_file(local_name, remote_name, bundle_id): + with open(local_name, 'w') as f: + f.write('Test data') + self.ios.push(local_name, remote_name, bundle_id) + try_remove(local_name) + + def create_and_push_dir(local_name, remote_name, bundle_id): + os.makedirs(local_name) + self.ios.push(local_name, remote_name, bundle_id) + try_remove(local_name) + + # 测试文件 + file_path = "/Documents/test_data.txt" + self._try_remove_ios(file_path, self.TEST_FSYNC_APP) + create_and_push_file("test_data.txt", "/Documents/", self.TEST_FSYNC_APP) + self.assertFalse(self.ios.is_dir(file_path, self.TEST_FSYNC_APP)) + self._try_remove_ios(file_path, self.TEST_FSYNC_APP) + + # 测试文件夹 + dir_path = "/Documents/test_dir" + create_and_push_dir("test_dir", "/Documents/", self.TEST_FSYNC_APP) + self.assertTrue(self.ios.is_dir(dir_path, self.TEST_FSYNC_APP)) + self._try_remove_ios(dir_path, self.TEST_FSYNC_APP) + + # 测试另外一个文件夹 + file_path_dcim = "/DCIM/test.png" + self._try_remove_ios(file_path_dcim, None) + create_and_push_file("test_data.txt", "/DCIM/test.png", None) + self.assertTrue(self.ios.is_dir("/DCIM", None)) + self.assertFalse(self.ios.is_dir(file_path_dcim, None)) + self._try_remove_ios(file_path_dcim, None) + if __name__ == '__main__': # unittest.main() - #构造测试集 + # 构造测试集 suite = unittest.TestSuite() # 初始化相关信息 suite.addTest(TestIos("test_session")) @@ -339,6 +694,12 @@ def test_set_clipboard(self): suite.addTest(TestIos("test_touch")) suite.addTest(TestIos("test_swipe")) suite.addTest(TestIos("test_double_click")) + suite.addTest(TestIos("test_ls")) + suite.addTest(TestIos("test_push")) + suite.addTest(TestIos("test_pull")) + suite.addTest(TestIos("test_mkdir")) + suite.addTest(TestIos("test_rm")) + suite.addTest(TestIos("test_is_dir")) # 联合接口,顺序测试:解锁屏、应用启动关闭 suite.addTest(TestIos("test_is_locked")) suite.addTest(TestIos("test_lock")) @@ -364,6 +725,6 @@ def test_set_clipboard(self): suite.addTest(TestIos("get_alert_accept")) suite.addTest(TestIos("get_alert_click")) - #执行测试 + # 执行测试 runner = unittest.TextTestRunner() runner.run(suite)