||
- import os
- import subprocess
- import sys
- import telnetlib
- import threading
- import time
- import uiautomator2
- import uiautomator2 as u2
- import yaml
- from task.task_dict import script_status
- from tools import loggerKit
- from tools.common_util import extract_latency, check_network_quality
- from tools.device_status import DeviceStatus
- # 读取 YAML 文件
- with open('config.yaml', 'r') as file:
- config = yaml.load(file, Loader=yaml.FullLoader)
- wework_package = config['wework']['package']
- # net_domain = 'http://api-qa.risingauto.net/'
- net_domain = config['bmp-cp']['net_domain']
- # extern_domain = 'https://api-qa.risingauto.com/'
- extern_domain = config['bmp-cp']['extern_domain']
- # 获取连接设备
- def get_local_devices():
- # 执行 adb 命令并获取输出
- result = subprocess.run(['adb', 'devices'], capture_output=True, text=True)
- # 解析输出,获取已连接设备列表
- output_lines = result.stdout.strip().split('\n')
- devices = []
- for line in output_lines[1:]:
- if '\tdevice' in line:
- device_info = line.split('\t')[0]
- devices.append(device_info)
- # 输出已连接设备列表
- if devices:
- loggerKit.info("已连接设备:")
- for device in devices:
- loggerKit.info(device)
- else:
- loggerKit.info("没有已连接的设备")
- return devices
- # 检验设备是否连接成功
- def check_device_connect(device):
- devices = get_local_devices()
- if device in devices:
- loggerKit.info(device + "已连接")
- return True
- else:
- loggerKit.info(device + "未连接")
- return False
- # 判断网络代理端口是否是通的
- def check_telnet(host, port):
- '''
- :param host: host
- :param port: port
- :return:
- '''
- try:
- # 创建Telnet对象
- tn = telnetlib.Telnet(host, port, timeout=10)
- tn.close()
- return True
- except ConnectionRefusedError:
- return False
- except TimeoutError:
- return False
- except Exception as e:
- print("An error occurred:", e)
- return False
- '''
- 通用手机截图
- '''
- def cap_device(d, device, task_id):
- '''
- :param d: uiautomator2 对象
- :param device: 设备id
- :param task_id: 任务id
- :return:
- '''
- try:
- app_dir = os.path.dirname(sys.argv[0])
- screenshot_path = os.path.join(app_dir, f'screen/{device}/')
- tmp_path = screenshot_path.replace(":", "_").replace(".", "_")
- # 检查路径是否存在
- if not os.path.exists(tmp_path):
- # 创建多级目录
- os.makedirs(os.path.dirname(tmp_path))
- file_path = os.path.join(tmp_path, f'{device}_{task_id}_{time.time()}.png')
- d.screenshot().save(file_path)
- loggerKit.info(f'设备:{device} 执行任务出现异常,截图保存:{file_path}, tmp_path:{tmp_path}')
- except Exception as e:
- loggerKit.error(f'设备:{device} 执行任务出现异常后截图保存发生异常:{str(e)}')
- '''
- check 手机设备状态
- '''
- def check_single_adb_connect(device, mobile):
- target_url = extern_domain.replace("https://", "").replace("http://", "").replace("/", "")
- cmd_conn = f"adb connect {device}"
- conn_output = subprocess.check_output(cmd_conn, shell=True)
- loggerKit.info(f"{conn_output}")
- if b'Connection refused' in conn_output:
- loggerKit.info('手机待激活{0}->{1},不能领取任务', device, mobile)
- return DeviceStatus.Mobile_Phone_To_Be_Activated
- elif b'already connected' in conn_output:
- # 再次check 设备的 atx agent 服务状态
- w = uiautomator2.connect(device)
- # check atx agent 服务状态
- if w.agent_alive:
- loggerKit.info('atx-agent 服务正常, atx 已连接{0}->{1}, 再次成功连接', device, mobile)
- else:
- # 启动atx-agent服务,防止服务手动误关闭
- start_atx_agent = f"adb -s {device} shell /data/local/tmp/atx-agent server -d --nouia "
- opts_output = subprocess.check_output(start_atx_agent, shell=True)
- loggerKit.info(f"启动设备{device}->{mobile} atx agent 服务, {opts_output}, 再次成功连接")
- # check 设备是否在线
- cmd_show = f"adb devices -l | grep {device}"
- show_output = subprocess.check_output(cmd_show, shell=True)
- loggerKit.info(f"设备{device}->{mobile}, {show_output}")
- if b"device product" in show_output:
- loggerKit.info("设备在线{0}->{1}, 可以领取任务", device, mobile)
- d = uiautomator2.connect(device)
- loggerKit.info("{0}->{1}=>{2}", d.uiautomator.running(), device, mobile)
- if d.uiautomator.running():
- # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 开始判断设备网络质量', device, mobile)
- loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常', device)
- # 判断手机端网络质量
- cmd_ping = f"adb -s " + device + " shell ping -c 6 " + target_url
- wifi_output = subprocess.check_output(cmd_ping, shell=True)
- byte_array = bytearray(wifi_output)
- byte_string = bytes(byte_array)
- # Convert byte string to string
- wifi_str = byte_string.decode('utf-8')
- latencies = extract_latency(wifi_str, device=device, mobile=mobile)
- network_quality = check_network_quality(latencies, device=device, mobile=mobile)
- match network_quality:
- case DeviceStatus.Device_Net_Good:
- loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量良好, 可以领取任务', device)
- return DeviceStatus.Available_For_Task
- case DeviceStatus.Device_Net_Common:
- loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量一般, 可以领取任务', device)
- return DeviceStatus.Available_For_Task
- case DeviceStatus.Device_Net_Bad:
- loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量较差, 不能领取任务', device)
- return DeviceStatus.Device_Await_Recover
- if network_quality == DeviceStatus.Device_Net_Good:
- loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量良好, 可以领取任务',
- device, mobile)
- return DeviceStatus.Available_For_Task
- elif network_quality == DeviceStatus.Device_Net_Common:
- loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量一般, 可以领取任务',
- device, mobile)
- return DeviceStatus.Available_For_Task
- else:
- loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量较差, 不能领取任务',
- device, mobile)
- return DeviceStatus.Device_Await_Recover
- return DeviceStatus.Available_For_Task
- else:
- loggerKit.info('atx 连接正常 {0}->{1}, uiautomator2 服务待人工启动, 不能领取任务', device, mobile)
- time.sleep(1)
- return DeviceStatus.To_Be_Manually_Started
- elif b"offline product" in show_output:
- loggerKit.info("设备不在线{0}->{1}, 不能领取任务", device, mobile)
- return DeviceStatus.Device_Offline
- else:
- loggerKit.info("{0}->{1}=>{2}", device, mobile, show_output)
- return DeviceStatus.Device_Offline
- elif b'connected to' in conn_output:
- u = uiautomator2.connect(device)
- # check atx agent 服务状态
- if u.agent_alive:
- loggerKit.info('atx-agent 服务正常, atx 已连接{0}->{1}, 首次成功连接', device, mobile)
- else:
- # 启动atx-agent服务,防止服务手动误关闭
- cmd_start_atx_agent = f"adb -s {device} shell /data/local/tmp/atx-agent server -d --nouia "
- opt_output = subprocess.check_output(cmd_start_atx_agent, shell=True)
- loggerKit.info(f"启动设备{device}->{mobile} atx agent 服务, {opt_output}, 首次成功连接")
- return DeviceStatus.First_Success_Connect
- else:
- loggerKit('atx已连接{0}->{1}, 首次成功连接', device, mobile)
- return DeviceStatus.First_Success_Connect
- def reboot_device(device_serial, exception):
- # 连接设备
- device = u2.connect(device_serial)
- device.healthcheck()
- device.screen_on()
- device.unlock()
- device.debug = False
- # 重启企业微信
- device.app_stop("com.tencent.wework")
- device.app_start("com.tencent.wework")
- loggerKit.info("异常, 重启设备 {0}, 异常原因:{1}", device_serial, exception)
- # 机器人操作重试机制
- def retry(operation, device_serial, content, retries=3, retry_interval=1):
- for i in range(retries):
- try:
- return operation()
- except Exception as e:
- if i == retries - 1: # 如果已经重试了retries次,那么抛出异常
- raise e
- time.sleep(retry_interval) # 等待1秒再重试
- loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 触发重试机制, 已经重试了{3}次",
- threading.current_thread().name, threading.get_ident(),
- device_serial, i)
- status = script_status(0, 500, content, '')
- reboot_device(device_serial, content)
- loggerKit.error("thread[{0}=>{1}], 设备号:{2}, {3}", threading.current_thread().name,
- threading.get_ident(), device_serial, content)
- return status
|