||
- """
- 通用工具类
- """
- import math
- import os
- import re
- import socket
- import subprocess
- import sys
- import telnetlib
- import threading
- import time
- import requests
- import uiautomator2
- import yaml
- from flask import json
- from tools import loggerKit
- from tools.device_status import DeviceStatus
- # 读取 YAML 文件
- with open('config.yaml', 'r') as file:
- config = yaml.load(file, Loader=yaml.FullLoader)
- # wework_package = config['wework']['package']
- # net_domain = 'http://api-qa.risingauto.net/'
- net_domain = config['bmp-cp']['net_domain']
- # extern_domain = 'https://api-qa.risingauto.com/'
- extern_domain = config['bmp-cp']['extern_domain']
- def get_ip_info(ip):
- response = requests.get('http://whois.pconline.com.cn/ipJson.jsp?ip=%s&json=true' % (ip))
- if response.ok:
- print(response.text)
- data = json.loads(response.text)
- pro = data['pro']
- city = data['city']
- addr = data['addr']
- print('%s, %s, %s', pro, city, addr)
- def get_city(ip):
- response = requests.get('http://whois.pconline.com.cn/ipJson.jsp?ip=%s&json=true' % (ip))
- if response.ok:
- data = json.loads(response.text)
- city = data['city']
- return city
- else:
- return None
- """
- 获取本地IP
- """
- def get_local_ip():
- ip = None
- s = None
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('8.8.8.8', 80))
- ip = s.getsockname()[0]
- finally:
- s.close()
- return ip
- # 获取连接设备
- def get_local_devices():
- # 执行 adb 命令并获取输出
- result = subprocess.run(['adb', 'devices'], capture_output=True, text=True)
- # 解析输出,获取已连接设备列表
- output_lines = result.stdout.strip().split('\n')
- devices = []
- for line in output_lines[1:]:
- if '\tdevice' in line:
- device_info = line.split('\t')[0]
- devices.append(device_info)
- # 输出已连接设备列表
- if devices:
- print("已连接设备:")
- for device in devices:
- print(device)
- else:
- print("没有已连接的设备")
- return devices
- # 检验设备是否连接成功
- def check_device_connect(device):
- devices = get_local_devices()
- if device in devices:
- print(device + "已连接")
- return True
- else:
- print(device + "未连接")
- return False
- def is_app_running(device_id, package_name):
- # 执行 adb shell pidof 命令检测应用程序是否运行
- command = ['adb', '-s', device_id, 'shell', 'pidof', package_name]
- result = subprocess.run(command, capture_output=True, text=True)
- # 检查命令执行结果
- if result.returncode == 0 and result.stdout.strip().isdigit():
- return True
- else:
- return False
- # 检查app运行状态
- def check_app_running(device_id, package_name):
- # 执行 adb shell pidof 命令检测应用程序是否运行
- try:
- command = ['adb', '-s', device_id, 'shell', 'pidof', package_name]
- result = subprocess.check_output(command, stderr=subprocess.STDOUT, text=True)
- return result.strip().isdigit()
- except subprocess.CalledProcessError as e:
- loggerKit.error("Failed to check if app {0} is running on device {1}, 异常信息:{2}", package_name, device_id,
- e.output)
- return True
- # check app运行状态
- def check_app_status(device_id, package_name):
- try:
- # Execute adb command to check if the app process is running
- command = ['adb', '-s', device_id, 'shell', 'pidof', package_name]
- result = subprocess.run(command, capture_output=True, text=True, check=True)
- # Check the return code to determine if the app process is running
- if result.returncode == 0:
- return True
- else:
- loggerKit.error("app {0} 未运行 on device {1}", package_name, device_id)
- return False
- except subprocess.CalledProcessError as e:
- loggerKit.error("Failed to check if app {0} is running on device {1}, 异常信息:{2}", package_name, device_id,
- e)
- return False
- # 判断设备网络状态
- def check_mobile_network(device_id):
- try:
- # 使用 adb 命令检查设备是否有网络连接 请求超时时间设置1s
- output = subprocess.check_output(
- ["adb", "-s", device_id, "shell", "ping", "-c", "1", "-W", "1", "api-qa.risingauto.com"])
- loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 网络正常,返回结果:{3}", threading.current_thread().name,
- threading.get_ident(), device_id, output)
- return True
- except subprocess.CalledProcessError as e:
- loggerKit.error("thread[{0}=>{1}], 设备号:{2}, 网络异常,返回结果:{3}", threading.current_thread().name,
- threading.get_ident(), device_id,
- e.output)
- return False
- # 检查设备网络状态
- def check_network_status(device_id):
- try:
- ping_domain = extern_domain.replace("https://", "").replace("http://", "").replace("/", "")
- command = "adb -s " + device_id + " shell ping -c 3 " + ping_domain
- print(f'command: {command}')
- output = subprocess.check_output(command, shell=True, timeout=4).decode('utf-8')
- if '3 received' in output or '3 packets received' in output:
- loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 网络正常,返回结果:{3}", threading.current_thread().name,
- threading.get_ident(), device_id, output)
- return True
- elif '100% packet loss' in output:
- loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 网络不可用, 100%丢包率, 返回结果:{3}",
- threading.current_thread().name, threading.get_ident(),
- device_id, output)
- return False
- else:
- loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 出现部分丢包情况, 返回结果:{3}",
- threading.current_thread().name, threading.get_ident(),
- device_id, output)
- return True
- except subprocess.CalledProcessError as e:
- loggerKit.error("thread[{0}=>{1}], 设备号:{2}, ping出现异常,返回结果:{3}", threading.current_thread().name,
- threading.get_ident(), device_id,
- e.output)
- return False
- def is_number(input_str):
- if input_str.isdigit():
- return True
- else:
- try:
- float(input_str)
- return True
- except ValueError:
- return False
- def contains_digit(string):
- for char in string:
- if char.isdigit():
- return True
- return False
- def delete_image(image_path):
- if os.path.exists(image_path):
- os.remove(image_path)
- # print(f"Image {image_path} deleted successfully.")
- else:
- print(f"Image {image_path} does not exist")
- # 判断参数是否错误 错误返回True
- def param_error(param):
- loggerKit.info("开始OCR参数对比:{0}", param)
- if '万' in param:
- param = param.replace('万', '')
- if '%' in param:
- param = param.replace('%', '')
- if ',' in param:
- param = param.replace(',', '')
- if ',' in param:
- param = param.replace(',', '')
- non_numeric_chars = re.compile(r'[^0-9.]')
- if non_numeric_chars.search(param):
- loggerKit.info("OCR参数对比有异常:{0}", param)
- return True
- else:
- loggerKit.info("OCR参数对比无异常:{0}", param)
- return False
- '''
- 判断字符串是否包含
- '''
- def contains_substring(string, substring):
- if substring in string:
- return True
- else:
- return False
- # 判断网络代理端口是否是通的
- def check_telnet(host, port):
- '''
- :param host: host
- :param port: port
- :return:
- '''
- try:
- # 创建Telnet对象
- tn = telnetlib.Telnet(host, port, timeout=10)
- tn.close()
- return True
- except ConnectionRefusedError:
- return False
- except TimeoutError:
- return False
- except Exception as e:
- print("An error occurred:", e)
- return False
- '''
- 通用手机截图
- '''
- def cap_device(d, device, task_id):
- '''
- :param d: uiautomator2 对象
- :param device: 设备id
- :param task_id: 任务id
- :return:
- '''
- try:
- app_dir = os.path.dirname(sys.argv[0])
- screenshot_path = os.path.join(app_dir, f'screen/{device}/')
- tmp_path = screenshot_path.replace(":", "_").replace(".", "_")
- # 检查路径是否存在
- if not os.path.exists(tmp_path):
- # 创建多级目录
- os.makedirs(os.path.dirname(tmp_path))
- file_path = os.path.join(tmp_path, f'{device}_{task_id}_{time.time()}.png')
- d.screenshot().save(file_path)
- loggerKit.info(f'设备:{device} 执行任务出现异常,截图保存:{file_path}, tmp_path:{tmp_path}')
- except Exception as e:
- loggerKit.error(f'设备:{device} 执行任务出现异常后截图保存发生异常:{str(e)}')
- '''
- check 手机设备状态
- '''
- def check_single_adb_connect(device, mobile):
- target_url = extern_domain.replace("https://", "").replace("http://", "").replace("/", "")
- cmd_conn = f"adb connect {device}"
- conn_output = subprocess.check_output(cmd_conn, shell=True)
- loggerKit.info(f"{conn_output}")
- if b'Connection refused' in conn_output:
- loggerKit.info('手机待激活{0}->{1},不能领取任务', device, mobile)
- return DeviceStatus.Mobile_Phone_To_Be_Activated
- elif b'already connected' in conn_output:
- # 再次check 设备的 atx agent 服务状态
- w = uiautomator2.connect(device)
- # check atx agent 服务状态
- if w.agent_alive:
- loggerKit.info('atx-agent 服务正常, atx 已连接{0}->{1}, 再次成功连接', device, mobile)
- else:
- # 启动atx-agent服务,防止服务手动误关闭
- start_atx_agent = f"adb -s {device} shell /data/local/tmp/atx-agent server -d --nouia "
- opts_output = subprocess.check_output(start_atx_agent, shell=True)
- loggerKit.info(f"启动设备{device}->{mobile} atx agent 服务, {opts_output}, 再次成功连接")
- # check 设备是否在线
- cmd_show = f"adb devices -l | grep {device}"
- show_output = subprocess.check_output(cmd_show, shell=True)
- loggerKit.info(f"设备{device}->{mobile}, {show_output}")
- if b"device product" in show_output:
- loggerKit.info("设备在线{0}->{1}, 可以领取任务", device, mobile)
- d = uiautomator2.connect(device)
- loggerKit.info("{0}->{1}=>{2}", d.uiautomator.running(), device, mobile)
- if d.uiautomator.running():
- # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 开始判断设备网络质量', device, mobile)
- loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常', device)
- # 判断手机端网络质量
- # cmd_ping = f"adb -s " + device + " shell ping -c 6 " + target_url
- # wifi_output = subprocess.check_output(cmd_ping, shell=True)
- #
- # byte_array = bytearray(wifi_output)
- # byte_string = bytes(byte_array)
- #
- # # Convert byte string to string
- # wifi_str = byte_string.decode('utf-8')
- #
- # latencies = extract_latency(wifi_str, device=device, mobile=mobile)
- # network_quality = check_network_quality(latencies, device=device, mobile=mobile)
- # match network_quality:
- # case DeviceStatus.Device_Net_Good:
- # loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量良好, 可以领取任务', device)
- # return DeviceStatus.Available_For_Task
- # case DeviceStatus.Device_Net_Common:
- # loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量一般, 可以领取任务', device)
- # return DeviceStatus.Available_For_Task
- # case DeviceStatus.Device_Net_Bad:
- # loggerKit.info('atx 连接正常{0}, uiautomator2 服务正常, 设备网络质量较差, 不能领取任务', device)
- # return DeviceStatus.Device_Await_Recover
- # if network_quality == DeviceStatus.Device_Net_Good:
- # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量良好, 可以领取任务',
- # device, mobile)
- # return DeviceStatus.Available_For_Task
- # elif network_quality == DeviceStatus.Device_Net_Common:
- # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量一般, 可以领取任务',
- # device, mobile)
- # return DeviceStatus.Available_For_Task
- # else:
- # loggerKit.info('atx 连接正常{0}->{1}, uiautomator2 服务正常, 设备网络质量较差, 不能领取任务',
- # device, mobile)
- # return DeviceStatus.Device_Await_Recover
- return DeviceStatus.Available_For_Task
- else:
- loggerKit.info('atx 连接正常 {0}->{1}, uiautomator2 服务待人工启动, 不能领取任务', device, mobile)
- time.sleep(1)
- return DeviceStatus.To_Be_Manually_Started
- elif b"offline product" in show_output:
- loggerKit.info("设备不在线{0}->{1}, 不能领取任务", device, mobile)
- return DeviceStatus.Device_Offline
- else:
- loggerKit.info("{0}->{1}=>{2}", device, mobile, show_output)
- return DeviceStatus.Device_Offline
- elif b'connected to' in conn_output:
- u = uiautomator2.connect(device)
- # check atx agent 服务状态
- if u.agent_alive:
- loggerKit.info('atx-agent 服务正常, atx 已连接{0}->{1}, 首次成功连接', device, mobile)
- else:
- # 启动atx-agent服务,防止服务手动误关闭
- cmd_start_atx_agent = f"adb -s {device} shell /data/local/tmp/atx-agent server -d --nouia "
- opt_output = subprocess.check_output(cmd_start_atx_agent, shell=True)
- loggerKit.info(f"启动设备{device}->{mobile} atx agent 服务, {opt_output}, 首次成功连接")
- return DeviceStatus.First_Success_Connect
- else:
- loggerKit('atx已连接{0}->{1}, 首次成功连接', device, mobile)
- return DeviceStatus.First_Success_Connect
- def check_wifi_info(device, mobile):
- target_url = extern_domain.replace("https://", "").replace("http://", "").replace("/", "")
- cmd_conn = f"adb connect {device}"
- conn_output = subprocess.check_output(cmd_conn, shell=True)
- loggerKit.info(f"设备:{device} => {conn_output}")
- cmd_ping = f"adb -s " + device + " shell ping -c 6 " + target_url
- wifi_output = subprocess.check_output(cmd_ping, shell=True)
- loggerKit.info(f"设备:{device} => {wifi_output}")
- byte_array = bytearray(wifi_output)
- byte_string = bytes(byte_array)
- # Convert byte string to string
- wifi_str = byte_string.decode('utf-8')
- latencies = extract_latency(wifi_str, device, mobile)
- network_quality = check_network_quality(latencies, device, mobile)
- print(f"设备:{device} => {network_quality}")
- # check 远程端口是否是通的
- def check_remote_port(host, port):
- try:
- # 创建socket对象
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(5)
- # 尝试连接远程主机的指定端口
- result = sock.connect_ex((host, port))
- sock.close()
- # 根据连接结果判断端口是否开放
- if result == 0:
- return True
- else:
- return False
- except Exception as e:
- loggerKit.error('check_remote_port, exception:{0}', e)
- return False
- # 计算距离
- def get_distance(lat1, lon1, lat2, lon2):
- r = 6371000
- dLat = (lat2 - lat1) * math.pi / 180
- dLon = (lon2 - lon1) * math.pi / 180
- a = math.sin(dLat / 2) * math.sin(dLat / 2) + math.cos(lat1 * math.pi / 180) * math.cos(
- lat2 * math.pi / 180) * math.sin(dLon / 2) * math.sin(dLon / 2)
- c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
- d = r * c
- return d
- '''
- 提取延迟时间
- '''
- def extract_latency(ping_result, device, mobile):
- latency_pattern = r"time=(\d+\.\d+)"
- latencies = re.findall(latency_pattern, ping_result)
- loggerKit.info("设备{0}->{1}, ping_result=>{2}, latencies=>{3}", device, mobile, ping_result, latencies)
- return [float(latency) for latency in latencies]
- '''
- 判断网络情况好坏
- '''
- def check_network_quality(latencies, device, mobile):
- avg_latency = sum(latencies) / len(latencies)
- max_latency = max(latencies)
- packet_loss = 100 - (len(latencies) / 6) * 100
- loggerKit.info('设备{0}->{1}, avg_latency=>{2}, max_latency={3}, packet_loss={4}', device, mobile, avg_latency,
- max_latency, packet_loss)
- if avg_latency <= 50 and max_latency <= 100 and packet_loss == 0:
- return DeviceStatus.Device_Net_Good
- elif avg_latency <= 200 and max_latency <= 300 and packet_loss <= 85:
- return DeviceStatus.Device_Net_Common
- else:
- return DeviceStatus.Device_Net_Bad
- def seconds_to_str(s):
- """秒转化天 时 分 秒"""
- day = s / 86400
- hour = (s % 86400) / 3600
- minute = (s % 3600) / 60
- secs = (s % 60)
- ret = ''
- if day:
- ret += '%d天' % day
- if hour:
- ret += '%d小时' % hour
- if minute:
- ret += '%d分' % minute
- if secs:
- ret += '%d秒' % secs
- return ret
- def to_json(obj):
- """
- 对象转json 的函数
- :param obj:
- :return:
- """
- return json.dumps(obj, default=lambda x: x.__dict__, ensure_ascii=False)
|