||
- # 读取 YAML 文件
- import json
- import threading
- import time
- from _imp import release_lock
- from concurrent.futures import ThreadPoolExecutor
- import httpx
- import redis
- import requests
- import yaml
- from tools import loggerKit
- from tools.common_util import check_network_status
- from tools.device_status import app_status, network_status
- from tools.ip_util import get_current_ip
- from tools.thread_pool import ThreadPoolSingleton
- with open('config.yaml', 'r') as file:
- config = yaml.load(file, Loader=yaml.FullLoader)
- redis_client = redis.Redis(host=config['redis']['host'], port=config['redis']['port'],
- password=config['redis']['password'], db=config['redis']['db'])
- lock_timeout = config['redis']['timeout']
- net_domain = config['bmp-cp']['net_domain']
- extern_domain = config['bmp-cp']['extern_domain']
- get_port_serial_url = extern_domain + config['bmp-cp']['get_port_serial_url']
- # 设备发现url
- device_discovery_url = extern_domain + config['bmp-cp']['device_discovery_url']
- # 获取设备id接口
- port_list_port_serial_url = extern_domain + config['bmp-cp']['port_list_port_serial']
- # python server端心跳
- server_heart_url = extern_domain + config['bmp-cp']['server_heart_url']
- # python all status 传输
- status_sync_url = extern_domain + config['bmp-cp']['status_sync_url']
- # 获取任务url
- get_task_url_v2 = extern_domain + config['bmp-cp']['get_task_url_v2']
- get_task_url_v3 = extern_domain + config['bmp-cp']['get_task_url_v3']
- # 任务执行回调url
- task_callback_url = extern_domain + config['bmp-cp']['task_callback_url']
- # python all status 传输
- event_url = config['bmp-cp']['event_url']
- # 初始化线程池
- thread_pool = ThreadPoolSingleton().get_executor()
- # 设备状态字典
- device_status_list = {}
- # 从任务中心领取任务
- def auto_pull_task():
- try:
- # 获取启动时初始化的设备信息
- port_response = httpx.get(port_list_port_serial_url)
- loggerKit.info("当前可用设备:{0}", port_response.text)
- data_list = json.loads(port_response.text).get('data')
- if data_list is not None:
- for data in data_list:
- device_id = data.get('deviceId')
- if device_id is not None or device_id == "null":
- device_status_list[device_id] = {'app_status': 1, 'connect_enable_status': 1, 'execute_status': 0,
- 'network_status': 1}
- if acquire_lock(device_id):
- thread_pool.submit(device_work, device_id)
- else:
- continue
- else:
- loggerKit.info("auto_pull_task_func 拉取任务: device_id is null")
- else:
- loggerKit.info("auto_pull_task_func 拉取任务: 查询设备信息为none")
- except Exception as ex:
- loggerKit.info("auto_pull_task_func 拉取任务异常: {0}", str(ex))
- # 设备发现
- def device_discovery():
- try:
- current_ip = get_current_ip()
- loggerKit.info("当前ip:{0},连接设备:{1}", current_ip, device_status_list.keys())
- # 调用Java端 服务心跳接口
- keys_list = list(device_status_list.keys())
- request_data = {
- # 服务ip
- "ip": current_ip,
- "deviceList": keys_list
- }
- response = httpx.post(device_discovery_url, json=request_data)
- if response.status_code == 200:
- loggerKit.info("设备发现同步成功!, 发送数据:{0}, 同步接口返回:{1}", json.dumps(request_data),
- response.text)
- else:
- loggerKit.error("设备发现同步失败, 状态码: {0}, text返回:{1}", response.status_code, response.text)
- except Exception as e:
- loggerKit.error("设备发现同步异常: {0}", str(e))
- # 服务心跳上传
- def server_heart_beat():
- # 更新设备连接
- # init_device_status()
- current_ip = get_current_ip()
- loggerKit.info("心跳上传ip:{0}", current_ip)
- # 调用Java端 服务心跳接口
- request_data = {
- # 服务ip
- "ip": current_ip
- }
- requests.post(server_heart_url, json=request_data)
- # 心跳定时同步
- # def synchronization_device_status(url, device, status):
- # with app.app_context():
- # request_params = {
- # "deviceUuid": device,
- # "status": status,
- # "interval": "1"
- # }
- #
- # try:
- # synchronization_response = httpx.post(url, json=request_params)
- # loggerKit.info("synchronization_device_status starting, thread[{0}=>{1}], 入参:{2}, 心跳状态同步返回:{3}",
- # threading.current_thread().name, threading.get_ident(), request_params,
- # json.dumps(synchronization_response.text, ensure_ascii=False))
- #
- # except Exception as callback_error:
- # loggerKit.error("synchronization_device_status starting, thread[{0}=>{1}], 心跳状态同步常异{2}",
- # threading.current_thread().name, threading.get_ident(), str(callback_error))
- # 检查本地心跳服务并上传
- def check_server_heart():
- try:
- current_ip = get_current_ip()
- request_data = {
- # 服务ip
- "ip": current_ip
- }
- response = httpx.post(server_heart_url, json=request_data)
- if response.status_code == 200:
- loggerKit.info("发送本地服务心跳成功!, 心跳同步接口返回:{0}", response.text)
- else:
- loggerKit.error("发送本地服务心跳失败, 状态码: {0}, text返回:{1}", response.status_code, response.text)
- except Exception as e:
- loggerKit.error("发送本地服务心跳异常: {0}", str(e))
- # 检查所有设备的连接状态 并同步给管理后台
- def sync_device_status():
- current_ip = get_current_ip()
- if device_status_list is None:
- loggerKit.info("无手机设备连接, 本机 IP: {0}", current_ip)
- return
- request_list = [get_request_data(device_key, device_info) for device_key, device_info in device_status_list.items()]
- request_data = {
- "ip": current_ip,
- "maintenanceRequests": request_list
- }
- with ThreadPoolExecutor(max_workers=6) as executor:
- executor.submit(sync_device_req, request_data)
- def get_request_data(device_key, device_info):
- app_status_value = app_status.enable.value
- network_status_value = network_status.enable.value if check_network_status(
- device_key) else network_status.unable.value
- device_info['app_status'] = app_status_value
- device_info['network_status'] = network_status_value
- return {
- "appStatus": app_status_value,
- "connectStatus": device_info['connect_enable_status'],
- "deviceUuid": device_key,
- "executeStatus": device_info['execute_status'],
- "mobileNetworkStatus": network_status_value
- }
- # 同步设备状态
- def sync_device_req(data):
- # start_time = time.time()
- try:
- loggerKit.info("同步设备状态入参:{0}", json.dumps(data))
- response = requests.post(status_sync_url, json=data)
- if response.status_code == 200:
- loggerKit.info("同步设备状态成功!, 同步接口返回:{0}", response.text)
- else:
- loggerKit.error("同步设备状态成功失败, 状态码: {0}, text返回:{1}", response.status_code, response.text)
- except Exception as e:
- loggerKit.error("同步设备状态成功异常: {0}", str(e))
- # 获取锁
- def acquire_lock(lock_name, acquire_timeout=1):
- loggerKit.info("设备:{0}获取到redis锁", lock_name)
- end_time = time.time() + acquire_timeout
- lock_value = str(time.time() + lock_timeout + 1)
- while time.time() < end_time:
- if redis_client.set(lock_name, lock_value, nx=True, ex=lock_timeout):
- return True
- time.sleep(0.001)
- return False
- # 对已获取锁的key 进行赋值
- def redis_set(key_name, value, acquire_timeout=3 * 60 * 60):
- loggerKit.info("设备号:{0},存入json{1}", key_name, value)
- end_time = time.time() + acquire_timeout
- while time.time() < end_time:
- if redis_client.set(key_name, value, ex=lock_timeout):
- return True
- time.sleep(0.001)
- return False
- # 传入对应的设备号
- def device_work(device_id):
- try:
- # 校验都通过 根据设备id调用Java接口 获取任务
- request_data = {
- # 设备ID
- "deviceUuid": device_id,
- "taskType": "contentInteraction",
- "taskSubType": "trainAccount"
- }
- response = None
- try:
- response = requests.post(get_task_url_v3, json=request_data, timeout=5)
- loggerKit.info('thread[{0}=>{1}], 设备:{2} -> 拉取任务数据:{3}', threading.current_thread().name,
- threading.get_ident(), device_id,
- json.dumps(response.text, ensure_ascii=False))
- except Exception as task_error:
- loggerKit.error('thread[{0}=>{1}], 调用拉取任务异常:{2}', threading.current_thread().name,
- threading.get_ident(), str(task_error))
- release_lock(device_id)
- event_request('P0', '设备' + device_id + '任务拉取服务端接口异常', time.time(), str(task_error), 'error')
- return
- if not response.ok:
- loggerKit.error('thread[{0}=>{1}], 请求信息异常:{2}', threading.current_thread().name,
- threading.get_ident(),
- json.dumps(response.text, ensure_ascii=False))
- release_lock(device_id)
- params = event_request('P0', '设备' + device_id + '任务拉取失败', time.time(),
- json.loads(response.text).get('data'), 'error')
- requests.post(event_url, json=params)
- return
- data = json.loads(response.text).get('data')
- if data is None:
- release_lock(device_id)
- return
- redis_set(device_id + "_task_json", json.dumps(data))
- except Exception as result:
- loggerKit.error("thread[{0}=>{1}], 设备号:{3},任务异常:{4}", threading.current_thread().name,
- threading.get_ident(), device_id,
- json.dumps(result, ensure_ascii=False))
- release_lock(device_id)
- return
- # 发送钉钉告警短信请求拼接
- def event_request(level, alert_name, start_time, message, status):
- response = {
- "level": level,
- "alertname": alert_name,
- "startTime": start_time,
- "message": message,
- "status": status
- }
- return response
- # 释放锁
- def release_lock(lock_name):
- loggerKit.info("设备:{0}redis锁释放", lock_name)
- redis_client.delete(lock_name)
- # post请求
- def call_back_request(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag):
- response = {
- "errorCode": error_code,
- "errorMsg": error_msg,
- "taskId": task_id,
- "taskStatus": task_status,
- "taskExecuteResponse": task_execute_response,
- "content": content,
- "demoFlag": demo_flag
- }
- return response
- def callback_task(err_code, err_msg, task_id, device_id, execute_status, result):
- callback_request2 = call_back_request(err_code, err_msg, task_id, execute_status, result, None, None)
- loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, result:{6} 。回调开始", threading.current_thread().name,
- threading.get_ident(), task_id, device_id, task_callback_url, execute_status, result)
- current_timeout = (5, 10)
- callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout)
- loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}", threading.current_thread().name,
- threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False),
- execute_status, result)
- return callback_response2
- def call_back_task_reply(err_code, err_msg, task_id, device_id, execute_status, result, content):
- callback_request2 = call_back_request_reply(err_code, err_msg, task_id, execute_status, result, content, None)
- loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, requestJson:{6} 。回调开始",
- threading.current_thread().name,
- threading.get_ident(), task_id, device_id, task_callback_url, execute_status, callback_request2)
- current_timeout = (5, 10)
- callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout)
- loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}",
- threading.current_thread().name,
- threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False),
- execute_status, result)
- return callback_response2
- """
- 回调参数(有评论内容版)
- """
- # post请求
- def call_back_request_reply(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag):
- response = {
- "errorCode": error_code,
- "errorMsg": error_msg,
- "taskId": task_id,
- "taskStatus": task_status,
- "taskExecuteResponse": task_execute_response,
- "content": content,
- "demoFlag": demo_flag
- }
- return response
- # 回复详情
- class content_detail:
- def __init__(self, keyword, title, reply, account_name):
- """
- :rtype: object
- """
- # 搜索关键词
- self.keyword = keyword
- self.title = title
- self.reply = reply
- self.account_name = account_name
- def to_dict(self):
- return {
- 'keyword': self.keyword,
- 'title': self.title,
- 'reply': self.reply,
- 'accountName': self.account_name
- }
|