""" 懂车帝简单互动 """ import json import time import httpx import yaml from urllib.parse import quote from func.action_func import del_key_vague from scene.oprator.atom_data import single_click_by_control, send_text_by_control, single_click_by_text, \ get_content_by_control, stop_app, start_app from task.task_job import callback_task from tools import redis_client, loggerKit import requests import threading with open('config.yaml', 'r') as file: config = yaml.load(file, Loader=yaml.FullLoader) extern_domain = config['bmp-cp']['extern_domain'] # 任务执行回调url task_callback_url = extern_domain + config['bmp-cp']['task_callback_url'] # 调用AIGC接口 post_ai_gc_url = extern_domain + config['bmp-content-center']['comment_local_url'] get_commented_list_url = extern_domain + config['bmp-cp']['get_commented_list_url'] def simple_comment(task_id, device_id, data): loggerKit.info('请求信息:{0}'.format(data)) device_id = data.get("deviceID") perform_action_id = data.get("performActionId") result = data.get("result") inner_data = data.get("data") # title = inner_data.get("resourceName") # author = inner_data.get("subResourceName") extension_info = data['data']['extendInfo'] extension_info = json.loads(extension_info) title = extension_info['title'] author = extension_info['author'] loggerKit.info("inner_data:{0}, keyword:{1}", inner_data, inner_data.get("resourceName")) if result is not None: """ 非首个指令 """ perform_action_result = result.get("performActionResult") if perform_action_result is None: return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } # 回调任务中心 del_key_vague(device_id) callback_task1(500, '指令执行失败', task_id, device_id, 0, None, None) return return_dict # 指令执行失败 if perform_action_result == "failure": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task1(500, '指令执行失败', task_id, device_id, 0, None, None) return return_dict # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task1(500, '指令执行失败', task_id, device_id, 0, None, None) return return_dict # 终止指令 if perform_action_result == "stop": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "指令被用户终止" } del_key_vague(device_id) callback_task1(500, '指令被用户终止', task_id, device_id, 0, None, None) return return_dict # 终止指令 if perform_action_result == "eleNotFound": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "未找到签到指示" } del_key_vague(device_id) callback_task1(500, '未找到签到指示,该账号当天可能已经签到过', task_id, device_id, 0, None, None) return return_dict # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is not success" } del_key_vague(device_id) callback_task1(500, '任务执行失败,元素未找到', task_id, device_id, 0, None, None) return return_dict """ 每次操作完成后会将对应的操作唯一id存储到redis 并且返回给手机端 手机端下次带着上个操作id来执行下一个操作 """ last_action_id = redis_client.get(device_id + "operate") step0 = redis_client.get(f"{device_id}_step0") if step0 is not None and int(step0) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) and perform_action_result == "success": action1_id = int(round(time.time() * 1000)) """ 发送第1条指令 懂车帝APP首页点击搜索框 """ action1_dict = single_click_by_control(action1_id, target_version="7.9.0", control_id="com.ss.android.auto:id/c9c") redis_client.set(device_id + "operate", action1_id) redis_client.set(f"{device_id}_step1", "1") redis_client.delete(f"{device_id}_step0") loggerKit.info("taskId:{0}, action1_id:{1}", task_id, action1_id) return action1_dict step1 = redis_client.get(f"{device_id}_step1") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action2_id = int(round(time.time() * 1000)) """ 发送第2条指令 在搜索框输入关键词 """ action2_dict = send_text_by_control(action2_id, control_id="com.ss.android.auto:id/h5q", content=f'{author}') redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step2", "1") redis_client.delete(f"{device_id}_step1") loggerKit.info("taskId:{0}, action2_id:{1}", task_id, action2_id) return action2_dict step2 = redis_client.get(f"{device_id}_step2") if step2 is not None and int(step2) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action3_id = int(round(time.time() * 1000)) """ 发送第3条指令 点击搜索 """ action3_dict = single_click_by_control(action3_id, control_id="com.ss.android.auto:id/g9e") redis_client.set(device_id + "operate", action3_id) redis_client.set(f"{device_id}_step3", "1") redis_client.delete(f"{device_id}_step2") loggerKit.info("taskId:{0}, action3_id:{1}", task_id, action3_id) return action3_dict # step3 = redis_client.get(f"{device_id}_step3") # if step3 is not None and int(step3) == 1 and last_action_id is not None and int(perform_action_id) == int( # last_action_id): # loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) # action4_id = int(round(time.time() * 1000)) # """ # 发送第4条指令 # 点击tab->口碑 # """ # action4_dict = single_click_by_text(action4_id, text="二手车") # redis_client.set(device_id + "operate", action4_id) # redis_client.set(f"{device_id}_step4", "1") # redis_client.delete(f"{device_id}_step3") # # loggerKit.info("taskId:{0}, action4_id:{1}", device_id, action4_id) # # return action4_dict # # step4 = redis_client.get(f"{device_id}_step4") # if step4 is not None and int(step4) == 1 and last_action_id is not None and int(perform_action_id) == int( # last_action_id): # loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) # action5_id = int(round(time.time() * 1000)) # """ # 发送第5条指令 # 点击tab->车友圈 # """ # action5_dict = single_click_by_text(action5_id, text="车友圈") # redis_client.set(device_id + "operate", action5_id) # redis_client.set(f"{device_id}_step5", "1") # redis_client.delete(f"{device_id}_step4") # # return action5_dict # # step5 = redis_client.get(f"{device_id}_step5") # if step5 is not None and int(step5) == 1 and last_action_id is not None and int(perform_action_id) == int( # last_action_id): # loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) # action6_id = int(round(time.time() * 1000)) # """ # 发送第6条指令 # 点击tab->用户 # """ # action6_dict = single_click_by_text(action6_id, text="用户") # # redis_client.set(device_id + "operate", action6_id) # redis_client.set(f"{device_id}_step6", "1") # redis_client.delete(f"{device_id}_step5") # # return action6_dict step3 = redis_client.get(f"{device_id}_step3") if step3 is not None and int(step3) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action4_id = int(round(time.time() * 1000)) """ 直接点击用户名称 """ action4_dict = single_click_by_text(action4_id, text=author) redis_client.set(device_id + "operate", action4_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step3") loggerKit.info("taskId:{0}, action4_id:{1}", device_id, action4_id) return action4_dict # 判断是否有搜索结果 step6 = redis_client.get(f"{device_id}_step6") if step6 is not None and int(step6) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action7_id = int(round(time.time() * 1000)) """ 点击搜素出来的博主 """ action7_dict = single_click_by_text(action7_id, text=f"{author}") redis_client.set(device_id + "operate", action7_id) redis_client.set(f"{device_id}_step7", "1") redis_client.delete(f"{device_id}_step6") return action7_dict # 存在搜索结果 step7 = redis_client.get(f"{device_id}_step7") if step7 is not None and int(step7) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action8_id = int(round(time.time() * 1000)) """ 存在搜索结果,已经跳转到了用户中心首页 开始匹配文章 """ action8_dict = get_content_by_control(action8_id, control_id="android.widget.TextView", title=f'{title}') redis_client.set(device_id + "operate", action8_id) redis_client.set(f"{device_id}_step8", "1") redis_client.delete(f"{device_id}_step7") return action8_dict # 文章评论,点击铅笔父节点 step8 = redis_client.get(f"{device_id}_step8") if step8 is not None and int(step8) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action9_id = int(round(time.time() * 1000)) """ 文章匹配,点击评论 """ loggerKit.info("action9_id:{0}, 文章评论,点击铅笔父节点", action9_id) action9_dict = single_click_by_control(action9_id, control_id="com.ss.android.auto:id/d98") redis_client.set(device_id + "operate", action9_id) redis_client.set(f"{device_id}_step9", "1") redis_client.delete(f"{device_id}_step8") return action9_dict """ 文章匹配,点击输入框,返回回复内容 """ step9 = redis_client.get(f"{device_id}_step9") if step9 is not None and int(step9) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action10_id = int(round(time.time() * 1000)) """ 文章匹配,点击输入框,返回回复内容 """ loggerKit.info("action10_id:{0}, 文章title:{1}, 回复内容:{2}") comment_response = httpx.get(get_commented_list_url + '?resourceName=' + title, timeout=120) # 评论实体 comment_body = json.loads(comment_response.text) # 评论集合 comment_list = comment_body.get('data') loggerKit.info("任务id:{0}对应标题:{1},已有评论:{2}", task_id, title, comment_list) existing_comment = '' if comment_list is not None: comment_list = list(set(comment_list)) existing_comment = '||'.join(comment_list) loggerKit.info("任务id:{0}对应标题:{1},拼接后的评论集合:{2}", task_id, title, existing_comment) existing_comment = quote(existing_comment) request_data = { "platform": "dcd", "content": title, "comment": existing_comment, "mode": "1" } response = httpx.post(post_ai_gc_url, json=request_data, timeout=120) loggerKit.info("任务id:{0}获取AIGC评论结果:{1}", task_id, response.text) reply_content = "不错👍" if response.is_success: response_body = json.loads(response.text) data = response_body.get('data') reply_content = data.get('comment') if len(reply_content) == 0: reply_content = "赞👍" content_result = content_detail(title, author, reply_content, None) redis_client.set(device_id + "reply_json", json.dumps(content_result.to_dict(), ensure_ascii=False)) action10_dict = send_text_by_control(action10_id, control_id="com.ss.android.auto:id/byd", content=f'{reply_content}') redis_client.set(device_id + "operate", action10_id) redis_client.set(f"{device_id}_step10", "1") redis_client.delete(f"{device_id}_step9") return action10_dict """ 点击发送按钮 """ step10 = redis_client.get(f"{device_id}_step10") if step10 is not None and int(step10) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action11_id = int(round(time.time() * 1000)) """ 文章匹配,点击输入框,返回回复内容 """ action11_dict = single_click_by_control(action11_id, control_id="com.ss.android.auto:id/jxq") redis_client.set(device_id + "operate", action11_id) redis_client.set(f"{device_id}_step11", "1") redis_client.delete(f"{device_id}_step10") return action11_dict """ 停止app """ step11 = redis_client.get(f"{device_id}_step11") if step11 is not None and int(step11) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action12_id = int(round(time.time() * 1000)) """ 停止指令 停止app """ action12_dict = stop_app(action12_id, target_version="7.9.0") loggerKit.info("设备:{0}, action12_id:{1}", device_id, action12_dict) reply_json = redis_client.get(device_id + "reply_json") callback_task1(None, None, task_id, device_id, 1, None, reply_json) del_key_vague(device_id) return action12_dict else: action0_id = int(round(time.time() * 1000)) """ 启动指令 启动app """ action0_dict = start_app(action0_id, target_version="7.9.0") redis_client.set(device_id + "operate", action0_id) redis_client.set(f"{device_id}_step0", "1") redis_client.delete(f"{device_id}_step11") loggerKit.info("设备:{0}, action0_id:{1}", device_id, action0_id) return action0_dict def callback_task1(err_code, err_msg, task_id, device_id, execute_status, result, content): callback_request2 = call_back_request(err_code, err_msg, task_id, execute_status, result, content, None) loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, requestJson:{6} 。回调开始", threading.current_thread().name, threading.get_ident(), task_id, device_id, task_callback_url, execute_status, callback_request2) current_timeout = (5, 10) callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout) loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}", threading.current_thread().name, threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False), execute_status, result) return callback_response2 # post请求 def call_back_request(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag): response = { "errorCode": error_code, "errorMsg": error_msg, "taskId": task_id, "taskStatus": task_status, "taskExecuteResponse": task_execute_response, "content": content, "demoFlag": demo_flag } return response # 回复详情 class content_detail: def __init__(self, keyword, title, reply, account_name): """ :rtype: object """ # 搜索关键词 self.keyword = keyword self.title = title self.reply = reply self.account_name = account_name def to_dict(self): return { 'keyword': self.keyword, 'title': self.title, 'reply': self.reply, 'accountName': self.account_name }