""" 通用工具类 """ import math import os import re import socket import subprocess import sys import telnetlib import threading import time import requests import uiautomator2 import yaml from flask import json from tools import loggerKit from tools.device_status import DeviceStatus # 读取 YAML 文件 with open('config.yaml', 'r') as file: config = yaml.load(file, Loader=yaml.FullLoader) # wework_package = config['wework']['package'] # net_domain = 'http://api-qa.risingauto.net/' net_domain = config['bmp-cp']['net_domain'] # extern_domain = 'https://api-qa.risingauto.com/' extern_domain = config['bmp-cp']['extern_domain'] def get_ip_info(ip): response = requests.get('http://whois.pconline.com.cn/ipJson.jsp?ip=%s&json=true' % (ip)) if response.ok: print(response.text) data = json.loads(response.text) pro = data['pro'] city = data['city'] addr = data['addr'] print('%s, %s, %s', pro, city, addr) def get_city(ip): response = requests.get('http://whois.pconline.com.cn/ipJson.jsp?ip=%s&json=true' % (ip)) if response.ok: data = json.loads(response.text) city = data['city'] return city else: return None """ 获取本地IP """ def get_local_ip(): ip = None s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip # 获取连接设备 def get_local_devices(): # 执行 adb 命令并获取输出 result = subprocess.run(['adb', 'devices'], capture_output=True, text=True) # 解析输出,获取已连接设备列表 output_lines = result.stdout.strip().split('\n') devices = [] for line in output_lines[1:]: if '\tdevice' in line: device_info = line.split('\t')[0] devices.append(device_info) # 输出已连接设备列表 if devices: print("已连接设备:") for device in devices: print(device) else: print("没有已连接的设备") return devices # 检验设备是否连接成功 def check_device_connect(device): devices = get_local_devices() if device in devices: print(device + "已连接") return True else: print(device + "未连接") return False def is_app_running(device_id, package_name): # 执行 adb shell pidof 命令检测应用程序是否运行 command = ['adb', '-s', device_id, 'shell', 'pidof', package_name] result = subprocess.run(command, capture_output=True, text=True) # 检查命令执行结果 if result.returncode == 0 and result.stdout.strip().isdigit(): return True else: return False # 检查app运行状态 def check_app_running(device_id, package_name): # 执行 adb shell pidof 命令检测应用程序是否运行 try: command = ['adb', '-s', device_id, 'shell', 'pidof', package_name] result = subprocess.check_output(command, stderr=subprocess.STDOUT, text=True) return result.strip().isdigit() except subprocess.CalledProcessError as e: loggerKit.error("Failed to check if app {0} is running on device {1}, 异常信息:{2}", package_name, device_id, e.output) return True # check app运行状态 def check_app_status(device_id, package_name): try: # Execute adb command to check if the app process is running command = ['adb', '-s', device_id, 'shell', 'pidof', package_name] result = subprocess.run(command, capture_output=True, text=True, check=True) # Check the return code to determine if the app process is running if result.returncode == 0: return True else: loggerKit.error("app {0} 未运行 on device {1}", package_name, device_id) return False except subprocess.CalledProcessError as e: loggerKit.error("Failed to check if app {0} is running on device {1}, 异常信息:{2}", package_name, device_id, e) return False # 判断设备网络状态 def check_mobile_network(device_id): try: # 使用 adb 命令检查设备是否有网络连接 请求超时时间设置1s output = subprocess.check_output( ["adb", "-s", device_id, "shell", "ping", "-c", "1", "-W", "1", "api-qa.risingauto.com"]) loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 网络正常,返回结果:{3}", threading.current_thread().name, threading.get_ident(), device_id, output) return True except subprocess.CalledProcessError as e: loggerKit.error("thread[{0}=>{1}], 设备号:{2}, 网络异常,返回结果:{3}", threading.current_thread().name, threading.get_ident(), device_id, e.output) return False # 检查设备网络状态 def check_network_status(device_id): try: ping_domain = extern_domain.replace("https://", "").replace("http://", "").replace("/", "") command = "adb -s " + device_id + " shell ping -c 3 " + ping_domain print(f'command: {command}') output = subprocess.check_output(command, shell=True, timeout=4).decode('utf-8') if '3 received' in output or '3 packets received' in output: loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 网络正常,返回结果:{3}", threading.current_thread().name, threading.get_ident(), device_id, output) return True elif '100% packet loss' in output: loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 网络不可用, 100%丢包率, 返回结果:{3}", threading.current_thread().name, threading.get_ident(), device_id, output) return False else: loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 出现部分丢包情况, 返回结果:{3}", threading.current_thread().name, threading.get_ident(), device_id, output) return True except subprocess.CalledProcessError as e: loggerKit.error("thread[{0}=>{1}], 设备号:{2}, ping出现异常,返回结果:{3}", threading.current_thread().name, threading.get_ident(), device_id, e.output) return False def is_number(input_str): if input_str.isdigit(): return True else: try: float(input_str) return True except ValueError: return False def contains_digit(string): for char in string: if char.isdigit(): return True return False def delete_image(image_path): if os.path.exists(image_path): os.remove(image_path) # print(f"Image {image_path} deleted successfully.") else: print(f"Image {image_path} does not exist") # 判断参数是否错误 错误返回True def param_error(param): loggerKit.info("开始OCR参数对比:{0}", param) if '万' in param: param = param.replace('万', '') if '%' in param: param = param.replace('%', '') if ',' in param: param = param.replace(',', '') if ',' in param: param = param.replace(',', '') non_numeric_chars = re.compile(r'[^0-9.]') if non_numeric_chars.search(param): loggerKit.info("OCR参数对比有异常:{0}", param) return True else: loggerKit.info("OCR参数对比无异常:{0}", param) return False ''' 判断字符串是否包含 ''' def contains_substring(string, substring): if substring in string: return True else: return False # 判断网络代理端口是否是通的 def check_telnet(host, port): ''' :param host: host :param port: port :return: ''' try: # 创建Telnet对象 tn = telnetlib.Telnet(host, port, timeout=10) tn.close() return True except ConnectionRefusedError: return False except TimeoutError: return False except Exception as e: print("An error occurred:", e) return False ''' 通用手机截图 ''' def cap_device(d, device, task_id): ''' :param d: uiautomator2 对象 :param device: 设备id :param task_id: 任务id :return: ''' try: app_dir = os.path.dirname(sys.argv[0]) screenshot_path = os.path.join(app_dir, f'screen/{device}/') tmp_path = screenshot_path.replace(":", "_").replace(".", "_") # 检查路径是否存在 if not os.path.exists(tmp_path): # 创建多级目录 os.makedirs(os.path.dirname(tmp_path)) file_path = os.path.join(tmp_path, f'{device}_{task_id}_{time.time()}.png') d.screenshot().save(file_path) loggerKit.info(f'设备:{device} 执行任务出现异常,截图保存:{file_path}, tmp_path:{tmp_path}') except Exception as e: loggerKit.error(f'设备:{device} 执行任务出现异常后截图保存发生异常:{str(e)}') ''' check 手机设备状态 ''' def check_single_adb_connect(device, mobile): target_url = extern_domain.replace("https://", "").replace("http://", "").replace("/", "") cmd_conn = f"adb connect {device}" conn_output = subprocess.check_output(cmd_conn, shell=True) loggerKit.info(f"{conn_output}") if b'Connection refused' in conn_output: loggerKit.info('手机待激活{0}->{1},不能领取任务', device, mobile) return DeviceStatus.Mobile_Phone_To_Be_Activated elif b'already connected' in conn_output: # 再次check 设备的 atx agent 服务状态 w = uiautomator2.connect(device) # check atx agent 服务状态 if w.agent_alive: loggerKit.info('atx-agent 服务正常, atx 已连接{0}->{1}, 再次成功连接', device, mobile) else: # 启动atx-agent服务,防止服务手动误关闭 start_atx_agent = f"adb -s {device} shell /data/local/tmp/atx-agent server -d --nouia " opts_output = subprocess.check_output(start_atx_agent, shell=True) loggerKit.info(f"启动设备{device}->{mobile} atx agent 服务, {opts_output}, 再次成功连接") # check 设备是否在线 cmd_show = f"adb devices -l | grep {device}" show_output = subprocess.check_output(cmd_show, shell=True) loggerKit.info(f"设备{device}->{mobile}, {show_output}") if b"device product" in show_output: loggerKit.info("设备在线{0}->{1}, 可以领取任务", device, mobile) d = uiautomator2.connect(device) loggerKit.info("{0}->{1}=>{2}", d.uiautomator.running(), device, mobile) if d.uiautomator.running(): # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 开始判断设备网络质量', device, mobile) loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常', device) # 判断手机端网络质量 # cmd_ping = f"adb -s " + device + " shell ping -c 6 " + target_url # wifi_output = subprocess.check_output(cmd_ping, shell=True) # # byte_array = bytearray(wifi_output) # byte_string = bytes(byte_array) # # # Convert byte string to string # wifi_str = byte_string.decode('utf-8') # # latencies = extract_latency(wifi_str, device=device, mobile=mobile) # network_quality = check_network_quality(latencies, device=device, mobile=mobile) # match network_quality: # case DeviceStatus.Device_Net_Good: # loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量良好, 可以领取任务', device) # return DeviceStatus.Available_For_Task # case DeviceStatus.Device_Net_Common: # loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量一般, 可以领取任务', device) # return DeviceStatus.Available_For_Task # case DeviceStatus.Device_Net_Bad: # loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量较差, 不能领取任务', device) # return DeviceStatus.Device_Await_Recover # if network_quality == DeviceStatus.Device_Net_Good: # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量良好, 可以领取任务', # device, mobile) # return DeviceStatus.Available_For_Task # elif network_quality == DeviceStatus.Device_Net_Common: # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量一般, 可以领取任务', # device, mobile) # return DeviceStatus.Available_For_Task # else: # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量较差, 不能领取任务', # device, mobile) # return DeviceStatus.Device_Await_Recover return DeviceStatus.Available_For_Task else: loggerKit.info('atx 连接正常 {0}->{1}, uiautomator2 服务待人工启动, 不能领取任务', device, mobile) time.sleep(1) return DeviceStatus.To_Be_Manually_Started elif b"offline product" in show_output: loggerKit.info("设备不在线{0}->{1}, 不能领取任务", device, mobile) return DeviceStatus.Device_Offline else: loggerKit.info("{0}->{1}=>{2}", device, mobile, show_output) return DeviceStatus.Device_Offline elif b'connected to' in conn_output: u = uiautomator2.connect(device) # check atx agent 服务状态 if u.agent_alive: loggerKit.info('atx-agent 服务正常, atx 已连接{0}->{1}, 首次成功连接', device, mobile) else: # 启动atx-agent服务,防止服务手动误关闭 cmd_start_atx_agent = f"adb -s {device} shell /data/local/tmp/atx-agent server -d --nouia " opt_output = subprocess.check_output(cmd_start_atx_agent, shell=True) loggerKit.info(f"启动设备{device}->{mobile} atx agent 服务, {opt_output}, 首次成功连接") return DeviceStatus.First_Success_Connect else: loggerKit('atx已连接{0}->{1}, 首次成功连接', device, mobile) return DeviceStatus.First_Success_Connect def check_wifi_info(device, mobile): target_url = extern_domain.replace("https://", "").replace("http://", "").replace("/", "") cmd_conn = f"adb connect {device}" conn_output = subprocess.check_output(cmd_conn, shell=True) loggerKit.info(f"设备:{device} => {conn_output}") cmd_ping = f"adb -s " + device + " shell ping -c 6 " + target_url wifi_output = subprocess.check_output(cmd_ping, shell=True) loggerKit.info(f"设备:{device} => {wifi_output}") byte_array = bytearray(wifi_output) byte_string = bytes(byte_array) # Convert byte string to string wifi_str = byte_string.decode('utf-8') latencies = extract_latency(wifi_str, device, mobile) network_quality = check_network_quality(latencies, device, mobile) print(f"设备:{device} => {network_quality}") # check 远程端口是否是通的 def check_remote_port(host, port): try: # 创建socket对象 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) # 尝试连接远程主机的指定端口 result = sock.connect_ex((host, port)) sock.close() # 根据连接结果判断端口是否开放 if result == 0: return True else: return False except Exception as e: loggerKit.error('check_remote_port, exception:{0}', e) return False # 计算距离 def get_distance(lat1, lon1, lat2, lon2): r = 6371000 dLat = (lat2 - lat1) * math.pi / 180 dLon = (lon2 - lon1) * math.pi / 180 a = math.sin(dLat / 2) * math.sin(dLat / 2) + math.cos(lat1 * math.pi / 180) * math.cos( lat2 * math.pi / 180) * math.sin(dLon / 2) * math.sin(dLon / 2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) d = r * c return d ''' 提取延迟时间 ''' def extract_latency(ping_result, device, mobile): latency_pattern = r"time=(\d+\.\d+)" latencies = re.findall(latency_pattern, ping_result) loggerKit.info("设备{0}->{1}, ping_result=>{2}, latencies=>{3}", device, mobile, ping_result, latencies) return [float(latency) for latency in latencies] ''' 判断网络情况好坏 ''' def check_network_quality(latencies, device, mobile): avg_latency = sum(latencies) / len(latencies) max_latency = max(latencies) packet_loss = 100 - (len(latencies) / 6) * 100 loggerKit.info('设备{0}->{1}, avg_latency=>{2}, max_latency={3}, packet_loss={4}', device, mobile, avg_latency, max_latency, packet_loss) if avg_latency <= 50 and max_latency <= 100 and packet_loss == 0: return DeviceStatus.Device_Net_Good elif avg_latency <= 200 and max_latency <= 300 and packet_loss <= 85: return DeviceStatus.Device_Net_Common else: return DeviceStatus.Device_Net_Bad def seconds_to_str(s): """秒转化天 时 分 秒""" day = s / 86400 hour = (s % 86400) / 3600 minute = (s % 3600) / 60 secs = (s % 60) ret = '' if day: ret += '%d天' % day if hour: ret += '%d小时' % hour if minute: ret += '%d分' % minute if secs: ret += '%d秒' % secs return ret def to_json(obj): """ 对象转json 的函数 :param obj: :return: """ return json.dumps(obj, default=lambda x: x.__dict__, ensure_ascii=False)