import os import subprocess import sys import telnetlib import threading import time import uiautomator2 import uiautomator2 as u2 import yaml from task.task_dict import script_status from tools import loggerKit from tools.common_util import extract_latency, check_network_quality from tools.device_status import DeviceStatus # 读取 YAML 文件 with open('config.yaml', 'r') as file: config = yaml.load(file, Loader=yaml.FullLoader) wework_package = config['wework']['package'] # net_domain = 'http://api-qa.risingauto.net/' net_domain = config['bmp-cp']['net_domain'] # extern_domain = 'https://api-qa.risingauto.com/' extern_domain = config['bmp-cp']['extern_domain'] # 获取连接设备 def get_local_devices(): # 执行 adb 命令并获取输出 result = subprocess.run(['adb', 'devices'], capture_output=True, text=True) # 解析输出,获取已连接设备列表 output_lines = result.stdout.strip().split('\n') devices = [] for line in output_lines[1:]: if '\tdevice' in line: device_info = line.split('\t')[0] devices.append(device_info) # 输出已连接设备列表 if devices: loggerKit.info("已连接设备:") for device in devices: loggerKit.info(device) else: loggerKit.info("没有已连接的设备") return devices # 检验设备是否连接成功 def check_device_connect(device): devices = get_local_devices() if device in devices: loggerKit.info(device + "已连接") return True else: loggerKit.info(device + "未连接") return False # 判断网络代理端口是否是通的 def check_telnet(host, port): ''' :param host: host :param port: port :return: ''' try: # 创建Telnet对象 tn = telnetlib.Telnet(host, port, timeout=10) tn.close() return True except ConnectionRefusedError: return False except TimeoutError: return False except Exception as e: print("An error occurred:", e) return False ''' 通用手机截图 ''' def cap_device(d, device, task_id): ''' :param d: uiautomator2 对象 :param device: 设备id :param task_id: 任务id :return: ''' try: app_dir = os.path.dirname(sys.argv[0]) screenshot_path = os.path.join(app_dir, f'screen/{device}/') tmp_path = screenshot_path.replace(":", "_").replace(".", "_") # 检查路径是否存在 if not os.path.exists(tmp_path): # 创建多级目录 os.makedirs(os.path.dirname(tmp_path)) file_path = os.path.join(tmp_path, f'{device}_{task_id}_{time.time()}.png') d.screenshot().save(file_path) loggerKit.info(f'设备:{device} 执行任务出现异常,截图保存:{file_path}, tmp_path:{tmp_path}') except Exception as e: loggerKit.error(f'设备:{device} 执行任务出现异常后截图保存发生异常:{str(e)}') ''' check 手机设备状态 ''' def check_single_adb_connect(device, mobile): target_url = extern_domain.replace("https://", "").replace("http://", "").replace("/", "") cmd_conn = f"adb connect {device}" conn_output = subprocess.check_output(cmd_conn, shell=True) loggerKit.info(f"{conn_output}") if b'Connection refused' in conn_output: loggerKit.info('手机待激活{0}->{1},不能领取任务', device, mobile) return DeviceStatus.Mobile_Phone_To_Be_Activated elif b'already connected' in conn_output: # 再次check 设备的 atx agent 服务状态 w = uiautomator2.connect(device) # check atx agent 服务状态 if w.agent_alive: loggerKit.info('atx-agent 服务正常, atx 已连接{0}->{1}, 再次成功连接', device, mobile) else: # 启动atx-agent服务,防止服务手动误关闭 start_atx_agent = f"adb -s {device} shell /data/local/tmp/atx-agent server -d --nouia " opts_output = subprocess.check_output(start_atx_agent, shell=True) loggerKit.info(f"启动设备{device}->{mobile} atx agent 服务, {opts_output}, 再次成功连接") # check 设备是否在线 cmd_show = f"adb devices -l | grep {device}" show_output = subprocess.check_output(cmd_show, shell=True) loggerKit.info(f"设备{device}->{mobile}, {show_output}") if b"device product" in show_output: loggerKit.info("设备在线{0}->{1}, 可以领取任务", device, mobile) d = uiautomator2.connect(device) loggerKit.info("{0}->{1}=>{2}", d.uiautomator.running(), device, mobile) if d.uiautomator.running(): # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 开始判断设备网络质量', device, mobile) loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常', device) # 判断手机端网络质量 cmd_ping = f"adb -s " + device + " shell ping -c 6 " + target_url wifi_output = subprocess.check_output(cmd_ping, shell=True) byte_array = bytearray(wifi_output) byte_string = bytes(byte_array) # Convert byte string to string wifi_str = byte_string.decode('utf-8') latencies = extract_latency(wifi_str, device=device, mobile=mobile) network_quality = check_network_quality(latencies, device=device, mobile=mobile) match network_quality: case DeviceStatus.Device_Net_Good: loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量良好, 可以领取任务', device) return DeviceStatus.Available_For_Task case DeviceStatus.Device_Net_Common: loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量一般, 可以领取任务', device) return DeviceStatus.Available_For_Task case DeviceStatus.Device_Net_Bad: loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量较差, 不能领取任务', device) return DeviceStatus.Device_Await_Recover if network_quality == DeviceStatus.Device_Net_Good: loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量良好, 可以领取任务', device, mobile) return DeviceStatus.Available_For_Task elif network_quality == DeviceStatus.Device_Net_Common: loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量一般, 可以领取任务', device, mobile) return DeviceStatus.Available_For_Task else: loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量较差, 不能领取任务', device, mobile) return DeviceStatus.Device_Await_Recover return DeviceStatus.Available_For_Task else: loggerKit.info('atx 连接正常 {0}->{1}, uiautomator2 服务待人工启动, 不能领取任务', device, mobile) time.sleep(1) return DeviceStatus.To_Be_Manually_Started elif b"offline product" in show_output: loggerKit.info("设备不在线{0}->{1}, 不能领取任务", device, mobile) return DeviceStatus.Device_Offline else: loggerKit.info("{0}->{1}=>{2}", device, mobile, show_output) return DeviceStatus.Device_Offline elif b'connected to' in conn_output: u = uiautomator2.connect(device) # check atx agent 服务状态 if u.agent_alive: loggerKit.info('atx-agent 服务正常, atx 已连接{0}->{1}, 首次成功连接', device, mobile) else: # 启动atx-agent服务,防止服务手动误关闭 cmd_start_atx_agent = f"adb -s {device} shell /data/local/tmp/atx-agent server -d --nouia " opt_output = subprocess.check_output(cmd_start_atx_agent, shell=True) loggerKit.info(f"启动设备{device}->{mobile} atx agent 服务, {opt_output}, 首次成功连接") return DeviceStatus.First_Success_Connect else: loggerKit('atx已连接{0}->{1}, 首次成功连接', device, mobile) return DeviceStatus.First_Success_Connect def reboot_device(device_serial, exception): # 连接设备 device = u2.connect(device_serial) device.healthcheck() device.screen_on() device.unlock() device.debug = False # 重启企业微信 device.app_stop("com.tencent.wework") device.app_start("com.tencent.wework") loggerKit.info("异常, 重启设备 {0}, 异常原因:{1}", device_serial, exception) # 机器人操作重试机制 def retry(operation, device_serial, content, retries=3, retry_interval=1): for i in range(retries): try: return operation() except Exception as e: if i == retries - 1: # 如果已经重试了retries次,那么抛出异常 raise e time.sleep(retry_interval) # 等待1秒再重试 loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 触发重试机制, 已经重试了{3}次", threading.current_thread().name, threading.get_ident(), device_serial, i) status = script_status(0, 500, content, '') reboot_device(device_serial, content) loggerKit.error("thread[{0}=>{1}], 设备号:{2}, {3}", threading.current_thread().name, threading.get_ident(), device_serial, content) return status