task_job.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. # 读取 YAML 文件
  2. import json
  3. import threading
  4. import time
  5. from _imp import release_lock
  6. from concurrent.futures import ThreadPoolExecutor
  7. import httpx
  8. import redis
  9. import requests
  10. import yaml
  11. from tools import loggerKit
  12. from tools.common_util import check_network_status
  13. from tools.device_status import app_status, network_status
  14. from tools.ip_util import get_current_ip
  15. from tools.thread_pool import ThreadPoolSingleton
  16. with open('config.yaml', 'r') as file:
  17. config = yaml.load(file, Loader=yaml.FullLoader)
  18. redis_client = redis.Redis(host=config['redis']['host'], port=config['redis']['port'],
  19. password=config['redis']['password'], db=config['redis']['db'])
  20. lock_timeout = config['redis']['timeout']
  21. net_domain = config['bmp-cp']['net_domain']
  22. extern_domain = config['bmp-cp']['extern_domain']
  23. get_port_serial_url = extern_domain + config['bmp-cp']['get_port_serial_url']
  24. # 设备发现url
  25. device_discovery_url = extern_domain + config['bmp-cp']['device_discovery_url']
  26. # 获取设备id接口
  27. port_list_port_serial_url = extern_domain + config['bmp-cp']['port_list_port_serial']
  28. # python server端心跳
  29. server_heart_url = extern_domain + config['bmp-cp']['server_heart_url']
  30. # python all status 传输
  31. status_sync_url = extern_domain + config['bmp-cp']['status_sync_url']
  32. # 获取任务url
  33. get_task_url_v2 = extern_domain + config['bmp-cp']['get_task_url_v2']
  34. get_task_url_v3 = extern_domain + config['bmp-cp']['get_task_url_v3']
  35. # 任务执行回调url
  36. task_callback_url = extern_domain + config['bmp-cp']['task_callback_url']
  37. # python all status 传输
  38. event_url = config['bmp-cp']['event_url']
  39. # 初始化线程池
  40. thread_pool = ThreadPoolSingleton().get_executor()
  41. # 设备状态字典
  42. device_status_list = {}
  43. # 从任务中心领取任务
  44. def auto_pull_task():
  45. try:
  46. # 获取启动时初始化的设备信息
  47. port_response = httpx.get(port_list_port_serial_url)
  48. loggerKit.info("当前可用设备:{0}", port_response.text)
  49. data_list = json.loads(port_response.text).get('data')
  50. if data_list is not None:
  51. for data in data_list:
  52. device_id = data.get('deviceId')
  53. if device_id is not None or device_id == "null":
  54. device_status_list[device_id] = {'app_status': 1, 'connect_enable_status': 1, 'execute_status': 0,
  55. 'network_status': 1}
  56. if acquire_lock(device_id):
  57. thread_pool.submit(device_work, device_id)
  58. else:
  59. continue
  60. else:
  61. loggerKit.info("auto_pull_task_func 拉取任务: device_id is null")
  62. else:
  63. loggerKit.info("auto_pull_task_func 拉取任务: 查询设备信息为none")
  64. except Exception as ex:
  65. loggerKit.info("auto_pull_task_func 拉取任务异常: {0}", str(ex))
  66. # 设备发现
  67. def device_discovery():
  68. try:
  69. current_ip = get_current_ip()
  70. loggerKit.info("当前ip:{0},连接设备:{1}", current_ip, device_status_list.keys())
  71. # 调用Java端 服务心跳接口
  72. keys_list = list(device_status_list.keys())
  73. request_data = {
  74. # 服务ip
  75. "ip": current_ip,
  76. "deviceList": keys_list
  77. }
  78. response = httpx.post(device_discovery_url, json=request_data)
  79. if response.status_code == 200:
  80. loggerKit.info("设备发现同步成功!, 发送数据:{0}, 同步接口返回:{1}", json.dumps(request_data),
  81. response.text)
  82. else:
  83. loggerKit.error("设备发现同步失败, 状态码: {0}, text返回:{1}", response.status_code, response.text)
  84. except Exception as e:
  85. loggerKit.error("设备发现同步异常: {0}", str(e))
  86. # 服务心跳上传
  87. def server_heart_beat():
  88. # 更新设备连接
  89. # init_device_status()
  90. current_ip = get_current_ip()
  91. loggerKit.info("心跳上传ip:{0}", current_ip)
  92. # 调用Java端 服务心跳接口
  93. request_data = {
  94. # 服务ip
  95. "ip": current_ip
  96. }
  97. requests.post(server_heart_url, json=request_data)
  98. # 心跳定时同步
  99. # def synchronization_device_status(url, device, status):
  100. # with app.app_context():
  101. # request_params = {
  102. # "deviceUuid": device,
  103. # "status": status,
  104. # "interval": "1"
  105. # }
  106. #
  107. # try:
  108. # synchronization_response = httpx.post(url, json=request_params)
  109. # loggerKit.info("synchronization_device_status starting, thread[{0}=>{1}], 入参:{2}, 心跳状态同步返回:{3}",
  110. # threading.current_thread().name, threading.get_ident(), request_params,
  111. # json.dumps(synchronization_response.text, ensure_ascii=False))
  112. #
  113. # except Exception as callback_error:
  114. # loggerKit.error("synchronization_device_status starting, thread[{0}=>{1}], 心跳状态同步常异{2}",
  115. # threading.current_thread().name, threading.get_ident(), str(callback_error))
  116. # 检查本地心跳服务并上传
  117. def check_server_heart():
  118. try:
  119. current_ip = get_current_ip()
  120. request_data = {
  121. # 服务ip
  122. "ip": current_ip
  123. }
  124. response = httpx.post(server_heart_url, json=request_data)
  125. if response.status_code == 200:
  126. loggerKit.info("发送本地服务心跳成功!, 心跳同步接口返回:{0}", response.text)
  127. else:
  128. loggerKit.error("发送本地服务心跳失败, 状态码: {0}, text返回:{1}", response.status_code, response.text)
  129. except Exception as e:
  130. loggerKit.error("发送本地服务心跳异常: {0}", str(e))
  131. # 检查所有设备的连接状态 并同步给管理后台
  132. def sync_device_status():
  133. current_ip = get_current_ip()
  134. if device_status_list is None:
  135. loggerKit.info("无手机设备连接, 本机 IP: {0}", current_ip)
  136. return
  137. request_list = [get_request_data(device_key, device_info) for device_key, device_info in device_status_list.items()]
  138. request_data = {
  139. "ip": current_ip,
  140. "maintenanceRequests": request_list
  141. }
  142. with ThreadPoolExecutor(max_workers=6) as executor:
  143. executor.submit(sync_device_req, request_data)
  144. def get_request_data(device_key, device_info):
  145. app_status_value = app_status.enable.value
  146. network_status_value = network_status.enable.value if check_network_status(
  147. device_key) else network_status.unable.value
  148. device_info['app_status'] = app_status_value
  149. device_info['network_status'] = network_status_value
  150. return {
  151. "appStatus": app_status_value,
  152. "connectStatus": device_info['connect_enable_status'],
  153. "deviceUuid": device_key,
  154. "executeStatus": device_info['execute_status'],
  155. "mobileNetworkStatus": network_status_value
  156. }
  157. # 同步设备状态
  158. def sync_device_req(data):
  159. # start_time = time.time()
  160. try:
  161. loggerKit.info("同步设备状态入参:{0}", json.dumps(data))
  162. response = requests.post(status_sync_url, json=data)
  163. if response.status_code == 200:
  164. loggerKit.info("同步设备状态成功!, 同步接口返回:{0}", response.text)
  165. else:
  166. loggerKit.error("同步设备状态成功失败, 状态码: {0}, text返回:{1}", response.status_code, response.text)
  167. except Exception as e:
  168. loggerKit.error("同步设备状态成功异常: {0}", str(e))
  169. # 获取锁
  170. def acquire_lock(lock_name, acquire_timeout=1):
  171. loggerKit.info("设备:{0}获取到redis锁", lock_name)
  172. end_time = time.time() + acquire_timeout
  173. lock_value = str(time.time() + lock_timeout + 1)
  174. while time.time() < end_time:
  175. if redis_client.set(lock_name, lock_value, nx=True, ex=lock_timeout):
  176. return True
  177. time.sleep(0.001)
  178. return False
  179. # 对已获取锁的key 进行赋值
  180. def redis_set(key_name, value, acquire_timeout=3 * 60 * 60):
  181. loggerKit.info("设备号:{0},存入json{1}", key_name, value)
  182. end_time = time.time() + acquire_timeout
  183. while time.time() < end_time:
  184. if redis_client.set(key_name, value, ex=lock_timeout):
  185. return True
  186. time.sleep(0.001)
  187. return False
  188. # 传入对应的设备号
  189. def device_work(device_id):
  190. try:
  191. # 校验都通过 根据设备id调用Java接口 获取任务
  192. request_data = {
  193. # 设备ID
  194. "deviceUuid": device_id,
  195. "taskType": "contentInteraction",
  196. "taskSubType": "trainAccount"
  197. }
  198. response = None
  199. try:
  200. response = requests.post(get_task_url_v3, json=request_data, timeout=5)
  201. loggerKit.info('thread[{0}=>{1}], 设备:{2} -> 拉取任务数据:{3}', threading.current_thread().name,
  202. threading.get_ident(), device_id,
  203. json.dumps(response.text, ensure_ascii=False))
  204. except Exception as task_error:
  205. loggerKit.error('thread[{0}=>{1}], 调用拉取任务异常:{2}', threading.current_thread().name,
  206. threading.get_ident(), str(task_error))
  207. release_lock(device_id)
  208. event_request('P0', '设备' + device_id + '任务拉取服务端接口异常', time.time(), str(task_error), 'error')
  209. return
  210. if not response.ok:
  211. loggerKit.error('thread[{0}=>{1}], 请求信息异常:{2}', threading.current_thread().name,
  212. threading.get_ident(),
  213. json.dumps(response.text, ensure_ascii=False))
  214. release_lock(device_id)
  215. params = event_request('P0', '设备' + device_id + '任务拉取失败', time.time(),
  216. json.loads(response.text).get('data'), 'error')
  217. requests.post(event_url, json=params)
  218. return
  219. data = json.loads(response.text).get('data')
  220. if data is None:
  221. release_lock(device_id)
  222. return
  223. redis_set(device_id + "_task_json", json.dumps(data))
  224. except Exception as result:
  225. loggerKit.error("thread[{0}=>{1}], 设备号:{3},任务异常:{4}", threading.current_thread().name,
  226. threading.get_ident(), device_id,
  227. json.dumps(result, ensure_ascii=False))
  228. release_lock(device_id)
  229. return
  230. # 发送钉钉告警短信请求拼接
  231. def event_request(level, alert_name, start_time, message, status):
  232. response = {
  233. "level": level,
  234. "alertname": alert_name,
  235. "startTime": start_time,
  236. "message": message,
  237. "status": status
  238. }
  239. return response
  240. # 释放锁
  241. def release_lock(lock_name):
  242. loggerKit.info("设备:{0}redis锁释放", lock_name)
  243. redis_client.delete(lock_name)
  244. # post请求
  245. def call_back_request(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag):
  246. response = {
  247. "errorCode": error_code,
  248. "errorMsg": error_msg,
  249. "taskId": task_id,
  250. "taskStatus": task_status,
  251. "taskExecuteResponse": task_execute_response,
  252. "content": content,
  253. "demoFlag": demo_flag
  254. }
  255. return response
  256. def callback_task(err_code, err_msg, task_id, device_id, execute_status, result):
  257. callback_request2 = call_back_request(err_code, err_msg, task_id, execute_status, result, None, None)
  258. loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, result:{6} 。回调开始", threading.current_thread().name,
  259. threading.get_ident(), task_id, device_id, task_callback_url, execute_status, result)
  260. current_timeout = (5, 10)
  261. callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout)
  262. loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}", threading.current_thread().name,
  263. threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False),
  264. execute_status, result)
  265. return callback_response2
  266. def call_back_task_reply(err_code, err_msg, task_id, device_id, execute_status, result, content):
  267. callback_request2 = call_back_request_reply(err_code, err_msg, task_id, execute_status, result, content, None)
  268. loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, requestJson:{6} 。回调开始",
  269. threading.current_thread().name,
  270. threading.get_ident(), task_id, device_id, task_callback_url, execute_status, callback_request2)
  271. current_timeout = (5, 10)
  272. callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout)
  273. loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}",
  274. threading.current_thread().name,
  275. threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False),
  276. execute_status, result)
  277. return callback_response2
  278. """
  279. 回调参数(有评论内容版)
  280. """
  281. # post请求
  282. def call_back_request_reply(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag):
  283. response = {
  284. "errorCode": error_code,
  285. "errorMsg": error_msg,
  286. "taskId": task_id,
  287. "taskStatus": task_status,
  288. "taskExecuteResponse": task_execute_response,
  289. "content": content,
  290. "demoFlag": demo_flag
  291. }
  292. return response
  293. # 回复详情
  294. class content_detail:
  295. def __init__(self, keyword, title, reply, account_name):
  296. """
  297. :rtype: object
  298. """
  299. # 搜索关键词
  300. self.keyword = keyword
  301. self.title = title
  302. self.reply = reply
  303. self.account_name = account_name
  304. def to_dict(self):
  305. return {
  306. 'keyword': self.keyword,
  307. 'title': self.title,
  308. 'reply': self.reply,
  309. 'accountName': self.account_name
  310. }