# 读取 YAML 文件 import json import threading import time from _imp import release_lock from concurrent.futures import ThreadPoolExecutor import httpx import redis import requests import yaml from tools import loggerKit from tools.common_util import check_network_status from tools.device_status import app_status, network_status from tools.ip_util import get_current_ip from tools.thread_pool import ThreadPoolSingleton with open('config.yaml', 'r') as file: config = yaml.load(file, Loader=yaml.FullLoader) redis_client = redis.Redis(host=config['redis']['host'], port=config['redis']['port'], password=config['redis']['password'], db=config['redis']['db']) lock_timeout = config['redis']['timeout'] net_domain = config['bmp-cp']['net_domain'] extern_domain = config['bmp-cp']['extern_domain'] get_port_serial_url = extern_domain + config['bmp-cp']['get_port_serial_url'] # 设备发现url device_discovery_url = extern_domain + config['bmp-cp']['device_discovery_url'] # 获取设备id接口 port_list_port_serial_url = extern_domain + config['bmp-cp']['port_list_port_serial'] # python server端心跳 server_heart_url = extern_domain + config['bmp-cp']['server_heart_url'] # python all status 传输 status_sync_url = extern_domain + config['bmp-cp']['status_sync_url'] # 获取任务url get_task_url_v2 = extern_domain + config['bmp-cp']['get_task_url_v2'] get_task_url_v3 = extern_domain + config['bmp-cp']['get_task_url_v3'] # 任务执行回调url task_callback_url = extern_domain + config['bmp-cp']['task_callback_url'] # python all status 传输 event_url = config['bmp-cp']['event_url'] # 初始化线程池 thread_pool = ThreadPoolSingleton().get_executor() # 设备状态字典 device_status_list = {} # 从任务中心领取任务 def auto_pull_task(): try: # 获取启动时初始化的设备信息 port_response = httpx.get(port_list_port_serial_url) loggerKit.info("当前可用设备:{0}", port_response.text) data_list = json.loads(port_response.text).get('data') if data_list is not None: for data in data_list: device_id = data.get('deviceId') if device_id is not None or device_id == "null": device_status_list[device_id] = {'app_status': 1, 'connect_enable_status': 1, 'execute_status': 0, 'network_status': 1} if acquire_lock(device_id): thread_pool.submit(device_work, device_id) else: continue else: loggerKit.info("auto_pull_task_func 拉取任务: device_id is null") else: loggerKit.info("auto_pull_task_func 拉取任务: 查询设备信息为none") except Exception as ex: loggerKit.info("auto_pull_task_func 拉取任务异常: {0}", str(ex)) # 设备发现 def device_discovery(): try: current_ip = get_current_ip() loggerKit.info("当前ip:{0},连接设备:{1}", current_ip, device_status_list.keys()) # 调用Java端 服务心跳接口 keys_list = list(device_status_list.keys()) request_data = { # 服务ip "ip": current_ip, "deviceList": keys_list } response = httpx.post(device_discovery_url, json=request_data) if response.status_code == 200: loggerKit.info("设备发现同步成功!, 发送数据:{0}, 同步接口返回:{1}", json.dumps(request_data), response.text) else: loggerKit.error("设备发现同步失败, 状态码: {0}, text返回:{1}", response.status_code, response.text) except Exception as e: loggerKit.error("设备发现同步异常: {0}", str(e)) # 服务心跳上传 def server_heart_beat(): # 更新设备连接 # init_device_status() current_ip = get_current_ip() loggerKit.info("心跳上传ip:{0}", current_ip) # 调用Java端 服务心跳接口 request_data = { # 服务ip "ip": current_ip } requests.post(server_heart_url, json=request_data) # 心跳定时同步 # def synchronization_device_status(url, device, status): # with app.app_context(): # request_params = { # "deviceUuid": device, # "status": status, # "interval": "1" # } # # try: # synchronization_response = httpx.post(url, json=request_params) # loggerKit.info("synchronization_device_status starting, thread[{0}=>{1}], 入参:{2}, 心跳状态同步返回:{3}", # threading.current_thread().name, threading.get_ident(), request_params, # json.dumps(synchronization_response.text, ensure_ascii=False)) # # except Exception as callback_error: # loggerKit.error("synchronization_device_status starting, thread[{0}=>{1}], 心跳状态同步常异{2}", # threading.current_thread().name, threading.get_ident(), str(callback_error)) # 检查本地心跳服务并上传 def check_server_heart(): try: current_ip = get_current_ip() request_data = { # 服务ip "ip": current_ip } response = httpx.post(server_heart_url, json=request_data) if response.status_code == 200: loggerKit.info("发送本地服务心跳成功!, 心跳同步接口返回:{0}", response.text) else: loggerKit.error("发送本地服务心跳失败, 状态码: {0}, text返回:{1}", response.status_code, response.text) except Exception as e: loggerKit.error("发送本地服务心跳异常: {0}", str(e)) # 检查所有设备的连接状态 并同步给管理后台 def sync_device_status(): current_ip = get_current_ip() if device_status_list is None: loggerKit.info("无手机设备连接, 本机 IP: {0}", current_ip) return request_list = [get_request_data(device_key, device_info) for device_key, device_info in device_status_list.items()] request_data = { "ip": current_ip, "maintenanceRequests": request_list } with ThreadPoolExecutor(max_workers=6) as executor: executor.submit(sync_device_req, request_data) def get_request_data(device_key, device_info): app_status_value = app_status.enable.value network_status_value = network_status.enable.value if check_network_status( device_key) else network_status.unable.value device_info['app_status'] = app_status_value device_info['network_status'] = network_status_value return { "appStatus": app_status_value, "connectStatus": device_info['connect_enable_status'], "deviceUuid": device_key, "executeStatus": device_info['execute_status'], "mobileNetworkStatus": network_status_value } # 同步设备状态 def sync_device_req(data): # start_time = time.time() try: loggerKit.info("同步设备状态入参:{0}", json.dumps(data)) response = requests.post(status_sync_url, json=data) if response.status_code == 200: loggerKit.info("同步设备状态成功!, 同步接口返回:{0}", response.text) else: loggerKit.error("同步设备状态成功失败, 状态码: {0}, text返回:{1}", response.status_code, response.text) except Exception as e: loggerKit.error("同步设备状态成功异常: {0}", str(e)) # 获取锁 def acquire_lock(lock_name, acquire_timeout=1): loggerKit.info("设备:{0}获取到redis锁", lock_name) end_time = time.time() + acquire_timeout lock_value = str(time.time() + lock_timeout + 1) while time.time() < end_time: if redis_client.set(lock_name, lock_value, nx=True, ex=lock_timeout): return True time.sleep(0.001) return False # 对已获取锁的key 进行赋值 def redis_set(key_name, value, acquire_timeout=3 * 60 * 60): loggerKit.info("设备号:{0},存入json{1}", key_name, value) end_time = time.time() + acquire_timeout while time.time() < end_time: if redis_client.set(key_name, value, ex=lock_timeout): return True time.sleep(0.001) return False # 传入对应的设备号 def device_work(device_id): try: # 校验都通过 根据设备id调用Java接口 获取任务 request_data = { # 设备ID "deviceUuid": device_id, "taskType": "contentInteraction", "taskSubType": "trainAccount" } response = None try: response = requests.post(get_task_url_v3, json=request_data, timeout=5) loggerKit.info('thread[{0}=>{1}], 设备:{2} -> 拉取任务数据:{3}', threading.current_thread().name, threading.get_ident(), device_id, json.dumps(response.text, ensure_ascii=False)) except Exception as task_error: loggerKit.error('thread[{0}=>{1}], 调用拉取任务异常:{2}', threading.current_thread().name, threading.get_ident(), str(task_error)) release_lock(device_id) event_request('P0', '设备' + device_id + '任务拉取服务端接口异常', time.time(), str(task_error), 'error') return if not response.ok: loggerKit.error('thread[{0}=>{1}], 请求信息异常:{2}', threading.current_thread().name, threading.get_ident(), json.dumps(response.text, ensure_ascii=False)) release_lock(device_id) params = event_request('P0', '设备' + device_id + '任务拉取失败', time.time(), json.loads(response.text).get('data'), 'error') requests.post(event_url, json=params) return data = json.loads(response.text).get('data') if data is None: release_lock(device_id) return redis_set(device_id + "_task_json", json.dumps(data)) except Exception as result: loggerKit.error("thread[{0}=>{1}], 设备号:{3},任务异常:{4}", threading.current_thread().name, threading.get_ident(), device_id, json.dumps(result, ensure_ascii=False)) release_lock(device_id) return # 发送钉钉告警短信请求拼接 def event_request(level, alert_name, start_time, message, status): response = { "level": level, "alertname": alert_name, "startTime": start_time, "message": message, "status": status } return response # 释放锁 def release_lock(lock_name): loggerKit.info("设备:{0}redis锁释放", lock_name) redis_client.delete(lock_name) # post请求 def call_back_request(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag): response = { "errorCode": error_code, "errorMsg": error_msg, "taskId": task_id, "taskStatus": task_status, "taskExecuteResponse": task_execute_response, "content": content, "demoFlag": demo_flag } return response def callback_task(err_code, err_msg, task_id, device_id, execute_status, result): callback_request2 = call_back_request(err_code, err_msg, task_id, execute_status, result, None, None) loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, result:{6} 。回调开始", threading.current_thread().name, threading.get_ident(), task_id, device_id, task_callback_url, execute_status, result) current_timeout = (5, 10) callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout) loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}", threading.current_thread().name, threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False), execute_status, result) return callback_response2 def call_back_task_reply(err_code, err_msg, task_id, device_id, execute_status, result, content): callback_request2 = call_back_request_reply(err_code, err_msg, task_id, execute_status, result, content, None) loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, requestJson:{6} 。回调开始", threading.current_thread().name, threading.get_ident(), task_id, device_id, task_callback_url, execute_status, callback_request2) current_timeout = (5, 10) callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout) loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}", threading.current_thread().name, threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False), execute_status, result) return callback_response2 """ 回调参数(有评论内容版) """ # post请求 def call_back_request_reply(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag): response = { "errorCode": error_code, "errorMsg": error_msg, "taskId": task_id, "taskStatus": task_status, "taskExecuteResponse": task_execute_response, "content": content, "demoFlag": demo_flag } return response # 回复详情 class content_detail: def __init__(self, keyword, title, reply, account_name): """ :rtype: object """ # 搜索关键词 self.keyword = keyword self.title = title self.reply = reply self.account_name = account_name def to_dict(self): return { 'keyword': self.keyword, 'title': self.title, 'reply': self.reply, 'accountName': self.account_name }