import base64 import os import time from func.action_func import del_key_vague from task.task_job import callback_task from tools import loggerKit, redis_client import scene.oprator.atom_data from tools.pic_base64_util import pic_to_base64 """ 检查任务状态 :param task_id 任务id :param data 客户端请求的参数字典 :param target_app app标识 :param package_name APP id :param target_version APP 版本 :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def check_result_status( task_id, data, target_app, target_version, package_name, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 获取 指令结构体 result = data.get("result") # 指令结构体不为None if result is not None: # 获取指令执行结果 perform_action_result = result.get("performActionResult") if perform_action_result is None or perform_action_result == "failure": # 指令执行结果为:None 或 失败 return make_fail_result(device_id, task_id) if perform_action_result == "stop": # 指令执行结果为:终止 return make_stop_result(device_id, task_id) else: return None else: """ 启动指令 启动app """ return make_task_start_app( data, target_app=target_app, target_version=target_version, package_name=package_name, timeout=timeout, sleep_time=sleep_time ) """ 启动APP任务 :param data 客户端请求的参数字典 :param target_app app标识 :param target_version 目标APP的指定版本 :param package_name 目标APP的id :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_start_app( data, target_app, target_version, package_name, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 生成新的任务id action_id = make_new_task_id() # 生成app启动json action_dict = scene.oprator.atom_data.start_app( request_id=action_id, target_app=target_app, target_version=target_version, package_name=package_name, timeout=timeout, sleep_time=sleep_time ) # 更新redis update_redis_step(device_id, action_id, 1, 9) # 记录日志 log_msg(device_id, "action0_id", action_id) # 返回生成app启动json字典 log_msg_return(device_id, action_dict) return action_dict """ 关闭APP任务 :param task_id 任务id :param data 客户端请求的参数字典 :param target_app app标识 :param target_version 目标APP的指定版本 :param package_name 目标APP的id :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_stop_app( task_id, data, step_index, target_app, target_version, package_name, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 生成新的任务id action_id = make_new_task_id() # 生成app启动json action_dict = scene.oprator.atom_data.stop_app( request_id=action_id, target_app=target_app, target_version=target_version, package_name=package_name, timeout=timeout, sleep_time=sleep_time ) # 更新redis update_redis_step(device_id, action_id, 1, step_index) # 记录日志 log_msg(device_id, "action0_id", action_id) # 返回生成app启动json字典 log_msg_return(device_id, action_dict) end_task(device_id, task_id) return action_dict """ 点击原生控件任务 需指定控件id,从weditor中抓取 :param data 客户端请求的参数字典 :param step_index 当前指定到第几步 :param target_app app标识 :param target_version 目标APP的指定版本 :param package_name 目标APP的id :param control_ids :param control_id 控件的唯一id :param item_index :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_click_widget( data, step_index, target_app, target_version, package_name, control_ids="", control_id="", item_index=0, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 获取指令id perform_action_id = data.get("performActionId") # 获取 指令结构体 result = data.get("result") # 获取任务执行状态 perform_action_result = result.get("performActionResult") step_id = read_redis_step_id(device_id, perform_action_id, step_index) # 获取redis缓存的操作id last_action_id = read_redis_last_operate_id(device_id) # 判断是否可以执行该步骤 if can_execute_step(step_id, last_action_id, perform_action_id, perform_action_result): # 生成新的任务id action_id = make_new_task_id() # 记录日志 log_msg(device_id, f"action{step_index}_id_{load_log_mark(step_index)}", int(last_action_id)) action_dict = scene.oprator.atom_data.single_click_by_control( action_id, target_app=target_app, target_version=target_version, package_name=package_name, control_ids=control_ids, control_id=control_id, item_index=item_index, timeout=timeout, sleep_time=sleep_time ) # 更新redis update_redis_step(device_id, action_id, step_index + 1, step_index) # 记录日志 log_msg(device_id, f"action{step_index}_id", action_id) # 返回新生成的操作json字典 log_msg_return(device_id, action_dict) return action_dict else: return None """ 点击文本 :param data 客户端请求的参数字典 :param step_index 当前指定到第几步 :param target_app app标识 :param package_name 目标APP的id :param target_version 目标APP的指定版本 :param text 要点击的文本 :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_click_text( data, step_index, target_app, package_name, target_version, text, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 获取指令id perform_action_id = data.get("performActionId") # 获取 指令结构体 result = data.get("result") # 获取任务执行状态 perform_action_result = result.get("performActionResult") step_id = read_redis_step_id(device_id, perform_action_id, step_index) # 获取redis缓存的操作id last_action_id = read_redis_last_operate_id(device_id) # 判断是否可以执行该步骤 if can_execute_step(step_id, last_action_id, perform_action_id, perform_action_result): # 生成新的任务id action_id = make_new_task_id() # 记录日志 log_msg(device_id, f"action{step_index}_id_{load_log_mark(step_index)}", int(last_action_id)) # 生成点击图片字典 action_dict = scene.oprator.atom_data.single_click_by_text( request_id=action_id, target_app=target_app, target_version=target_version, package_name=package_name, text=text, timeout=timeout, sleep_time=sleep_time ) # 更新redis update_redis_step(device_id, action_id, step_index + 1, step_index) # 记录日志 log_msg(device_id, f"action{step_index}_id", action_id) # 返回新生成的操作json字典 log_msg_return(device_id, action_dict) return action_dict else: return None """ 滑动屏幕任务 :param data 客户端请求的参数字典 :param step_index 当前指定到第几步 :param target_app app标识 :param package_name 目标APP的id :param target_version 目标APP的指定版本 :param scale 滑动比例 :param direction 滑动方向 :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_swipe_screen( data, step_index, target_app, package_name, target_version, scale=0.5, direction="up", timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 获取指令id perform_action_id = data.get("performActionId") # 获取 指令结构体 result = data.get("result") # 获取任务执行状态 perform_action_result = result.get("performActionResult") step = read_redis_step_id(device_id, perform_action_id, step_index) # 获取redis缓存的操作id last_action_id = read_redis_last_operate_id(device_id) # 判断是否可以执行该步骤 if can_execute_step(step, last_action_id, perform_action_id, perform_action_result): # 生成新的任务id action_id = make_new_task_id() # 记录日志 log_msg(device_id, f"action{step_index}_id_{load_log_mark(step_index)}", int(last_action_id)) # 生成滑动屏幕操作json action_dict = scene.oprator.atom_data.swipe_screen( request_id=action_id, target_app=target_app, target_version=target_version, package_name=package_name, scale=scale, timeout=timeout, sleep_time=sleep_time, direction=direction ) # 更新redis update_redis_step(device_id, action_id, step_index + 1, step_index) # 记录日志 log_msg(device_id, f"action{step_index}_id", action_id) # 返回新生成的操作json字典 log_msg_return(device_id, action_dict) return action_dict else: return None """ 连续滑动屏幕任务 :param data 客户端请求的参数字典 :param step_index 当前指定到第几步 :param target_app app标识 :param package_name 目标APP的id :param target_version 目标APP的指定版本 :param scale 滑动比例 :param direction 滑动方向 :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_continual_swipe_screen( data, step_index, target_app, package_name, target_version, scale=0.5, direction="up", is_need_loop=False, loop_count=1, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 获取指令id perform_action_id = data.get("performActionId") # 获取 指令结构体 result = data.get("result") # 获取任务执行状态 perform_action_result = result.get("performActionResult") step = read_redis_step_id(device_id, perform_action_id, step_index) # 获取redis缓存的操作id last_action_id = read_redis_last_operate_id(device_id) # 判断是否可以执行该步骤 if can_execute_step(step, last_action_id, perform_action_id, perform_action_result): # 生成新的任务id action_id = make_new_task_id() # 记录日志 log_msg(device_id, f"action{step_index}_id_{load_log_mark(step_index)}", int(last_action_id)) # 生成滑动屏幕操作json action_dict = scene.oprator.atom_data.swipe_screen( request_id=action_id, target_app=target_app, target_version=target_version, package_name=package_name, scale=scale, timeout=timeout, sleep_time=sleep_time, direction=direction ) if is_need_loop == False: # 更新redis update_redis_step(device_id, action_id, step_index + 1, step_index) else: loop_key = loop_key_format(device_id, str(step_index)) cur_loop_count = redis_client.get(loop_key) if cur_loop_count == None: cur_loop_count = 0 else: cur_loop_count = int(cur_loop_count) if cur_loop_count < loop_count: update_redis_step(device_id, action_id, step_index, step_index) cur_loop_count += 1 redis_client.set(loop_key, cur_loop_count) else : update_redis_step(device_id, action_id, step_index + 1, step_index) # 记录日志 log_msg(device_id, f"action{step_index}_id", action_id) # 返回新生成的操作json字典 log_msg_return(device_id, action_dict) return action_dict else: return None """ 点击图片任务,通过图片识别触发 需指定图片的文件路径 :param data 客户端请求的参数字典 :param step_index 当前指定到第几步 :param target_app app标识 :param package_name 目标APP的id :param target_version 目标APP的指定版本 :param pic_base64 图片的base64字符串 :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_click_image( data, step_index, target_app, package_name, target_version, pic_base64, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 获取指令id perform_action_id = data.get("performActionId") # 获取 指令结构体 result = data.get("result") # 获取任务执行状态 perform_action_result = result.get("performActionResult") step_id = read_redis_step_id(device_id, perform_action_id, step_index) # 获取redis缓存的操作id last_action_id = read_redis_last_operate_id(device_id) # 判断是否可以执行该步骤 if can_execute_step(step_id, last_action_id, perform_action_id, perform_action_result): # 生成新的任务id action_id = make_new_task_id() # 记录日志 log_msg(device_id, f"action{step_index}_id_{load_log_mark(step_index)}", int(last_action_id)) # 生成点击图片字典 action_dict = scene.oprator.atom_data.click_pic( request_id=action_id, target_app=target_app, target_version=target_version, package_name=package_name, pic_base64=pic_base64, timeout=timeout, sleep_time=sleep_time ) # 更新redis update_redis_step(device_id, action_id, step_index + 1, step_index) # 记录日志 log_msg(device_id, f"action{step_index}_id", action_id) # 返回新生成的操作json字典 log_msg_return(device_id, action_dict) return action_dict else: return None """ 返回上一页 :param data 客户端请求的参数字典 :param step_index 当前指定到第几步 :param target_app app标识 :param package_name 目标APP的id :param target_version 目标APP的指定版本 :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_click_back( data, step_index, target_app, package_name, target_version, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 获取指令id perform_action_id = data.get("performActionId") # 获取 指令结构体 result = data.get("result") # 获取任务执行状态 perform_action_result = result.get("performActionResult") step_id = read_redis_step_id(device_id, perform_action_id, step_index) # 获取redis缓存的操作id last_action_id = read_redis_last_operate_id(device_id) # 判断是否可以执行该步骤 if can_execute_step(step_id, last_action_id, perform_action_id, perform_action_result): # 生成新的任务id action_id = make_new_task_id() # 记录日志 log_msg(device_id, f"action{step_index}_id_{load_log_mark(step_index)}", int(last_action_id)) # 生成点击图片字典 action_dict = scene.oprator.atom_data.back_last_page( request_id=action_id, target_app=target_app, target_version=target_version, package_name=package_name, timeout=timeout, sleep_time=sleep_time ) # 更新redis update_redis_step(device_id, action_id, step_index + 1, step_index) # 记录日志 log_msg(device_id, f"action{step_index}_id", action_id) # 返回新生成的操作json字典 log_msg_return(device_id, action_dict) return action_dict else: return None """ 根据id输入文本内容 :param data 客户端请求的参数字典 :param step_index 当前指定到第几步 :param target_app app标识 :param package_name 目标APP的id :param target_version 目标APP的指定版本 :param control_id 控件的id :param content 要输入的文本 :param timeout 超时时间 s :param sleep_time 休眠时间 s """ def make_task_send_text( data, step_index, target_app, package_name, target_version, control_id, content, timeout=30, sleep_time=10 ): # 获取设备id device_id = data.get("deviceID") # 获取指令id perform_action_id = data.get("performActionId") # 获取 指令结构体 result = data.get("result") # 获取任务执行状态 perform_action_result = result.get("performActionResult") step_id = read_redis_step_id(device_id, perform_action_id, step_index) # 获取redis缓存的操作id last_action_id = read_redis_last_operate_id(device_id) # 判断是否可以执行该步骤 if can_execute_step(step_id, last_action_id, perform_action_id, perform_action_result): # 生成新的任务id action_id = make_new_task_id() # 记录日志 log_msg(device_id, f"action{step_index}_id_{load_log_mark(step_index)}", int(last_action_id)) # 生成点击图片字典 action_dict = scene.oprator.atom_data.send_text_by_control( request_id=action_id, target_app=target_app, target_version=target_version, package_name=package_name, control_id=control_id, content=content, timeout=timeout, sleep_time=sleep_time ) # 更新redis update_redis_step(device_id, action_id, step_index + 1, step_index) # 记录日志 log_msg(device_id, f"action{step_index}_id", action_id) # 返回新生成的操作json字典 log_msg_return(device_id, action_dict) return action_dict else: return None """ 生成日志标记 """ def load_log_mark(step): log_mark = "mem" if step == 1: log_mark = "start" return log_mark """ 生成新的任务id """ def make_new_task_id(): return int(round(time.time() * 1000)) """ 记录格式化日志 """ def log_msg(device_id, msg, action_id): loggerKit.info("设备:{0}, {1}:{2}", device_id, msg, action_id) """ 记录返回内容 """ def log_msg_return(device_id, result): loggerKit.info("返回数据:{0}, {1}", device_id, result) """ 判断是否可以执行某个步骤 """ def can_execute_step(step_id, last_action_id, perform_action_id, perform_action_result): if step_id is not None and \ int(step_id) == 1 and \ last_action_id is not None and \ int(perform_action_id) == int(last_action_id) and \ perform_action_result == "success": return True else: return False """ 根据图片名返回图片base64字符串 """ def read_pic_base64_string(pic_path): with open(pic_path, "rb") as image_file: try: encoded_string = "data:image/png;base64," + base64.b64encode(image_file.read()).decode('utf-8') except Exception as e: print(f"img_to_base64 error:{e}") return encoded_string """ 更新redis任务id状态 """ def update_redis_step(device_id, action_id, set_step, del_setp): """ 每次操作完成后会将对应的操作唯一id存储到redis 并且返回给手机端 手机端下次带着上个操作id来执行下一个操作 """ redis_client.delete(step_key_format(f"{device_id}_{action_id}", del_setp)) redis_client.set(operate_key_format(device_id), action_id) redis_client.set(step_key_format(f"{device_id}_{action_id}", set_step), "1") """ 生成redis操作key """ def operate_key_format(key): return f"{key}operate" """ 生成redis步骤key """ def step_key_format(key, step): return f"{key}_step{step}" """ 生成redis循环key """ def loop_key_format(key, step): return f"{key}_loop{step}" """ 生成终止json返回数据 """ def make_stop_result(device_id, task_id): return make_result(device_id, task_id, -2, "指令被用户终止") """ 生成失败json返回数据 """ def make_fail_result(device_id, task_id): return make_result(device_id, task_id, -2, "fail, performActionResult is null") """ 生成json返回数据 """ def make_result(device_id, task_id, code, message): return_dict = { "data": "", "code": str(code), "message": message } # del_key_vague(device_id) callback_task(500, '指令执行失败', task_id, device_id, 0, None) return return_dict """ 读取redis最后一次operate的id """ def read_redis_last_operate_id(device_id): return redis_client.get(device_id + "operate") """ 读取redis某一步的操作id """ def read_redis_step_id(device_id, action_id, step): if action_id == "" or action_id == None: return redis_client.get(f"{device_id}_step{step}") else: return redis_client.get(f"{device_id}_{action_id}_step{step}") """ 结束任务,恢复任务状态 """ def end_task(device_id, task_id): del_key_vague(device_id) # 回调任务中心修改任务状态 callback_task(None, None, task_id, device_id, 1, None)