""" 官媒账号互动操作(点赞收藏评论 按账号去重) """ import time from urllib.parse import quote from func.action_func import del_key_vague from scene.oprator.atom_data import single_click_by_control, start_app, get_content_by_control, single_click_by_text, \ many_click_by_position, stop_app, send_text_by_control, get_text_by_control, back_last_page, swipe_screen from tools import loggerKit, redis_client import json import httpx import threading import yaml import requests with open('config.yaml', 'r') as file: config = yaml.load(file, Loader=yaml.FullLoader) extern_domain = config['bmp-cp']['extern_domain'] # 任务执行回调url task_callback_url = extern_domain + config['bmp-cp']['task_callback_url'] # 调用AIGC接口 post_ai_gc_url = extern_domain + config['bmp-content-center']['comment_local_url'] get_commented_list_url = extern_domain + config['bmp-cp']['get_commented_list_url'] # 官媒任务 # 版本29.5.0 def douyin_random_watch_video(task_id, device_id, data): loggerKit.info('请求信息:{0}'.format(data)) device_id = data.get("deviceID") perform_action_id = data.get("performActionId") result = data.get("result") inner_data = data.get("data") # resource_name = inner_data.get("resourceName") extension_info = data['data']['extendInfo'] extension_info = json.loads(extension_info) resource_name = extension_info['author'] loggerKit.info("inner_data:{0}, keyword:{1}", inner_data, inner_data.get("resourceName")) if result is not None: """ 非首个指令 """ perform_action_result = result.get("performActionResult") if perform_action_result is None: return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } # 回调任务中心 del_key_vague(device_id) call_back_task_reply(500, '指令执行失败', task_id, device_id, 0, None, None) return return_dict # 指令执行失败 if perform_action_result == "failure": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) call_back_task_reply(500, '指令执行失败', task_id, device_id, 0, None, None) return return_dict # 终止指令 if perform_action_result == "stop": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "指令被用户终止" } del_key_vague(device_id) call_back_task_reply(500, '指令被用户终止', task_id, device_id, 0, None, None) return return_dict # 终止指令 if perform_action_result == "eleNotFound": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "未找到签到指示" } del_key_vague(device_id) call_back_task_reply(500, '未找到签到指示,该账号当天可能已经签到过', task_id, device_id, 0, None, None) return return_dict # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is not success" } del_key_vague(device_id) call_back_task_reply(500, '任务执行失败,元素未找到', task_id, device_id, 0, None, None) return return_dict """ 每次操作完成后会将对应的操作唯一id存储到redis 并且返回给手机端 手机端下次带着上个操作id来执行下一个操作 """ last_action_id = redis_client.get(device_id + "operate") step0 = redis_client.get(f"{device_id}_step0") if (step0 is not None and int(step0) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) and perform_action_result == "success"): action1_id = int(round(time.time() * 1000)) """ 发送第1条指令 点击:我(菜单) """ action1_dict = single_click_by_text(action1_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", text="我") redis_client.set(device_id + "operate", action1_id) redis_client.set(f"{device_id}_step1", "1") redis_client.delete(f"{device_id}_step0") loggerKit.info("taskId:{0}, action1_id:{1}", task_id, action1_id) return action1_dict # 获取到对应抖音账号并存入redis: step1 = redis_client.get(f"{device_id}_step1") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action2_id = int(round(time.time() * 1000)) """ 插播指令 获取当前的抖音账号 获取当前抖音账号 """ action2_dict = get_text_by_control(action2_id, control_id="com.ss.android.ugc.aweme.lite:id/ib7", target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", sleep_time=3) redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step_get_account", "1") redis_client.delete(f"{device_id}_step1") loggerKit.info("taskId:{0}, action2_id:{1}", task_id, action2_id) return action2_dict step1 = redis_client.get(f"{device_id}_step_get_account") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action2_id = int(round(time.time() * 1000)) # 获取当前登陆账号并存入缓存 account_name = result.get("performActionText") if account_name is None: # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "未找到签到指示" } del_key_vague(device_id) call_back_task_reply(500, '未获取到账号信息', task_id, device_id, 0, None, None) return return_dict # 将当前设备操作的账号存入redis redis_client.set(device_id + 'account', account_name) """ 发送第2条指令 点击:关注 """ action2_dict = single_click_by_control(action2_id, control_id="com.ss.android.ugc.aweme.lite:id/o3+", target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step2", "1") redis_client.delete(f"{device_id}_step_get_account") loggerKit.info("taskId:{0}, action2_id:{1}", task_id, action2_id) return action2_dict step2 = redis_client.get(f"{device_id}_step2") if step2 is not None and int(step2) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action3_id = int(round(time.time() * 1000)) """ 发送第3条指令 点击:荣威ROEWE """ action3_dict = get_content_by_control(action3_id, title=resource_name, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action3_id) redis_client.set(f"{device_id}_step4", "1") redis_client.delete(f"{device_id}_step2") loggerKit.info("taskId:{0}, action3_id:{1}", task_id, action3_id) return action3_dict step5 = redis_client.get(f"{device_id}_step4") if step5 is not None and int(step5) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action6_id = int(round(time.time() * 1000)) """ 发送第5条指令 随机点击某个视频 """ action6_dict = single_click_by_control(action6_id, control_id="com.ss.android.ugc.aweme.lite:id/container", control_ids="com.ss.android.ugc.aweme.lite:id/container", target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action6_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step4") return action6_dict # 获取对应的视频时间戳 step5 = redis_client.get(f"{device_id}_step6") if step5 is not None and int(step5) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action6_id = int(round(time.time() * 1000)) """ 插播指令 获取到对应视频的发布时间 """ action6_dict = get_text_by_control(action6_id, control_id="com.ss.android.ugc.aweme.lite:id/_+", target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", sleep_time=5) redis_client.set(device_id + "operate", action6_id) redis_client.set(f"{device_id}_step_timestamp", "1") redis_client.delete(f"{device_id}_step6") return action6_dict # 判断该视频对应账号是否已经点赞过 step5 = redis_client.get(f"{device_id}_step_timestamp") if step5 is not None and int(step5) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action6_id = int(round(time.time() * 1000)) timestamp = result.get("performActionText") redis_client.set(device_id + 'timestamp', timestamp) if timestamp is None or timestamp == '': # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is not success" } del_key_vague(device_id) call_back_task_reply(500, '任务执行失败,未获取到时间戳,无法判断是否执行过', task_id, device_id, 0, None, None) return return_dict """ 根据发布时间和用户账号 判断该用户是否对该帖子进行过操作 未操作过开始进行点赞 已经操作过则下滑下一个 """ # redis中获取到对应账号 account = redis_client.get(device_id + 'account') # 获取账号已做过的帖子集合 done_list = redis_client.lrange('douyin_done:' + account, 0, -1) if done_list is None: # 如果处理视频集合为空 表示未处理过视频 进行点赞收藏 action7_dict = many_click_by_position(action6_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action6_id) redis_client.set(f"{device_id}_step7", "1") redis_client.delete(f"{device_id}_step_timestamp") loggerKit.info("task:{0}账号{1}未对视频{2}进行过处理,开始进行点赞", task_id, account, timestamp) return action7_dict if timestamp in done_list: # 添加滑动次数 swipe_count = redis_client.incr(device_id + "swipe_count", 1) # 从0开始计数 滑动五次结束 if swipe_count >= 5: action7_dict = stop_app(action6_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", timeout=5) loggerKit.info("task:{0}账号{1}已对官媒{2}前五条视频做过处理", task_id, account, resource_name) # 回调任务中心修改任务状态 call_back_task_reply(None, "账号:" + account + "已经对账号:" + resource_name + "前五条视频进行过处理", task_id, device_id, 1, None, None) del_key_vague(device_id) return action7_dict # 如果该视频已经在已操作集合中 进行下滑 action7_dict = swipe_screen(action6_id, scale=1, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action6_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step_timestamp") loggerKit.info("task:{0}账号{1}已对视频{2}进行过处理,进行下滑操作", task_id, account, timestamp) return action7_dict action7_dict = single_click_by_control(action6_id, control_id="com.ss.android.ugc.aweme.lite:id/cs5", target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action6_id) redis_client.set(f"{device_id}_step7", "1") redis_client.delete(f"{device_id}_step_timestamp") return action7_dict # 视频收藏 step7 = redis_client.get(f"{device_id}_step7") if step7 is not None and int(step7) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action8_id = int(round(time.time() * 1000)) """ 发送第8条指令 视频收藏->长按收藏 """ action8_dict = single_click_by_control(action8_id, control_id="com.ss.android.ugc.aweme.lite:id/bnb", target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action8_id) redis_client.set(f"{device_id}_step8", "1") redis_client.delete(f"{device_id}_step7") return action8_dict # 获取帖子标题 step8 = redis_client.get(f"{device_id}_step8") if step8 is not None and int(step8) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action9_id = int(round(time.time() * 1000)) """ 发送第9条指令 获取帖子标题 """ action9_dict = get_text_by_control(action9_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", max_page='1', control_id="com.ss.android.ugc.aweme.lite:id/desc", timeout=5) redis_client.set(device_id + "operate", action9_id) redis_client.set(f"{device_id}_step9", "1") redis_client.delete(f"{device_id}_step8") return action9_dict step9 = redis_client.get(f"{device_id}_step9") if step9 is not None and int(step9) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action4_id_mem:{1}", device_id, int(last_action_id)) action10_id = int(round(time.time() * 1000)) """ 发送第10条指令 根据视频内容 生成评论文案将文案缓存 并且点击评论 """ result_text = result.get("performActionText") # if result_text is None: # return_dict = { # "data": "", # "code": -2, # "message": "fail, result_text is null" # } # # # 回调任务中心 # # del_key_vague(device_id) # # callback_task_reply(500, '获取到的内容标题为null,无法评论', task_id, device_id, 0, None) # # return return_dict redis_client.set(device_id + "douyin" + "comments_title", result_text) comment_response = httpx.get(get_commented_list_url + '?resourceName=' + result_text, timeout=120) # 评论实体 comment_body = json.loads(comment_response.text) # 评论集合 comment_list = comment_body.get('data') loggerKit.info("任务id:{0}对应标题:{1},已有评论:{2}", task_id, result_text, comment_list) existing_comment = '' if comment_list is not None: comment_list = list(set(comment_list)) existing_comment = '||'.join(comment_list) loggerKit.info("任务id:{0}对应标题:{1},拼接后的评论集合:{2}", task_id, result_text, existing_comment) existing_comment = quote(existing_comment) request_data = { "mode": "1", "title": result_text, "comment": existing_comment, "platform": "douyin" } loggerKit.info("任务id:{0}对应标题:{1},请求AIGC信息:{2}", task_id, result_text, request_data) response = httpx.post(post_ai_gc_url, json=request_data, timeout=120) # 点击评论框 action10_dict = single_click_by_control(action10_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/comment_container", timeout=5) if not response.is_success: # 调用AIGC获取评论失败 loggerKit.info("任务id:{0}对应标题:{1},请求AIGC失败信息:{2},返回信息:{3}", task_id, result_text, request_data, response) redis_client.set(device_id + "douyin" + "comments", '不错👍') redis_client.set(device_id + "operate", action10_id) redis_client.set(f"{device_id}_step10", "1") redis_client.delete(f"{device_id}_step9") loggerKit.info("设备:{0}, action10_id:{1}", device_id, action10_id) return action10_dict response_body = json.loads(response.text) reply_data = response_body.get('data') reply = reply_data.get('comment') if '内容太少' in reply or '对不起' in reply or reply is None or reply == '' or '提供' in reply: # 获取到的评论内容不符合 loggerKit.info("任务id:{0}对应标题:{1},请求AIGC失败信息:{2},返回信息:{3}", task_id, result_text, request_data, response) redis_client.set(device_id + "douyin" + "comments", '不错👍') redis_client.set(device_id + "operate", action10_id) redis_client.set(f"{device_id}_step10", "1") redis_client.delete(f"{device_id}_step9") loggerKit.info("设备:{0}, action10_id:{1}", device_id, action10_id) return action10_dict # 将需要评论的内容存入缓存 redis_client.set(device_id + "douyin" + "comments", reply) redis_client.set(device_id + "operate", action10_id) redis_client.set(f"{device_id}_step10", "1") redis_client.delete(f"{device_id}_step9") loggerKit.info("设备:{0}, action10_id:{1}", device_id, action10_id) return action10_dict step10 = redis_client.get(f"{device_id}_step10") if step10 is not None and int(step10) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action11_id_mem:{1}", device_id, int(last_action_id)) action11_id = int(round(time.time() * 1000)) """ 发送第11条指令 点击评论框 """ reply = redis_client.get(device_id + "douyin" + "comments") if reply is None: # 调用AIGC获取评论失败 del_key_vague(device_id) return_dict = { "data": "", "code": -2, "message": "fail, result_text is null" } call_back_task_reply(500, '获取到的内容标题为null,无法评论', task_id, device_id, 0, None, None) return return_dict action11_dict = single_click_by_control(action11_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/bq0", timeout=5) redis_client.set(device_id + "operate", action11_id) redis_client.set(f"{device_id}_step11", "1") redis_client.delete(f"{device_id}_step10") loggerKit.info("设备:{0}, action11_id:{1}", device_id, action11_id) return action11_dict step11 = redis_client.get(f"{device_id}_step11") if step11 is not None and int(step11) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action12_id_mem:{1}", device_id, int(last_action_id)) action12_id = int(round(time.time() * 1000)) """ 发送第12条指令 对评论框进行赋值 """ reply = redis_client.get(device_id + "douyin" + "comments") if reply is None: # 调用AIGC获取评论失败 del_key_vague(device_id) return_dict = { "data": "", "code": -2, "message": "fail, result_text is null" } call_back_task_reply(500, '获取到的内容标题为null,无法评论', task_id, device_id, 0, None, None) return return_dict action12_dict = send_text_by_control(action12_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/bq0", content=reply, timeout=5) redis_client.set(device_id + "operate", action12_id) redis_client.set(f"{device_id}_step12", "1") redis_client.delete(f"{device_id}_step11") comment_title = redis_client.get(device_id + "douyin" + "comments_title") content_result = content_detail(resource_name, comment_title, reply, None) redis_client.set(device_id + "reply_json", json.dumps(content_result.to_dict(), ensure_ascii=False)) loggerKit.info("设备:{0}, action12_id:{1}", device_id, action12_id) return action12_dict step12 = redis_client.get(f"{device_id}_step12") if step12 is not None and int(step12) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action13_id_mem:{1}", device_id, int(last_action_id)) action13_id = int(round(time.time() * 1000)) """ 发送第13条指令 发送评论 """ action12_dict = single_click_by_control(action13_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/bg=", timeout=5) redis_client.set(device_id + "operate", action13_id) redis_client.set(f"{device_id}_step13", "1") redis_client.delete(f"{device_id}_step12") loggerKit.info("设备:{0}, action13_id:{1}", device_id, action12_dict) return action12_dict step13 = redis_client.get(f"{device_id}_step13") if step13 is not None and int(step13) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action13_id_mem:{1}", device_id, int(last_action_id)) action14_id = int(round(time.time() * 1000)) # 评论发送成功,将帖子表识存入到已处理过的集合中 account = redis_client.get(device_id + 'account') title = redis_client.get(device_id + 'timestamp') redis_client.left_push('douyin_done:' + account, title) """ 发送第14条指令 返回上一页 """ action14_dict = back_last_page(action14_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", sleep_time=3) redis_client.set(device_id + "operate", action14_id) redis_client.set(f"{device_id}_step14", "1") redis_client.delete(f"{device_id}_step13") loggerKit.info("设备:{0}, action14_id:{1}", device_id, action14_id) return action14_dict step14 = redis_client.get(f"{device_id}_step14") if step14 is not None and int(step14) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action13_id_mem:{1}", device_id, int(last_action_id)) action15_id = int(round(time.time() * 1000)) """ 发送第15条指令 浏览90s """ action15_dict = many_click_by_position(action15_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", count=1, sleep_time=90) redis_client.set(device_id + "operate", action15_id) redis_client.set(f"{device_id}_step15", "1") redis_client.delete(f"{device_id}_step14") loggerKit.info("设备:{0}, action15_id:{1}", device_id, action15_id) return action15_dict """ 停止app """ step15 = redis_client.get(f"{device_id}_step15") if step15 is not None and int(step15) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) and perform_action_result == "success": loggerKit.info("设备:{0}, action14_id_mem:{1}", device_id, int(last_action_id)) action16_id = int(round(time.time() * 1000)) """ 停止指令 停止app """ action14_dict = stop_app(action16_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", timeout=5) redis_client.delete(f"{device_id}_step4") loggerKit.info("设备:{0}, action14_id:{1}", device_id, action16_id) reply_json = redis_client.get(device_id + "reply_json") # 回调任务中心修改任务状态 call_back_task_reply(None, None, task_id, device_id, 1, None, reply_json) del_key_vague(device_id) return action14_dict else: action0_id = int(round(time.time() * 1000)) """ 启动指令 启动app """ action0_dict = start_app(action0_id, target_app="douyin", target_version="29.5.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action0_id) redis_client.set(f"{device_id}_step0", "1") redis_client.delete(f"{device_id}_step11") loggerKit.info("设备:{0}, action0_id:{1}", device_id, action0_id) return action0_dict def call_back_task_reply(err_code, err_msg, task_id, device_id, execute_status, result, content): callback_request2 = call_back_request(err_code, err_msg, task_id, execute_status, result, content, None) loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, requestJson:{6} 。回调开始", threading.current_thread().name, threading.get_ident(), task_id, device_id, task_callback_url, execute_status, callback_request2) current_timeout = (5, 10) callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout) loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}", threading.current_thread().name, threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False), execute_status, result) return callback_response2 """ 添加了其他参数 """ # post请求 def call_back_request(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag): response = { "errorCode": error_code, "errorMsg": error_msg, "taskId": task_id, "taskStatus": task_status, "taskExecuteResponse": task_execute_response, "content": content, "demoFlag": demo_flag } return response # 回复详情 class content_detail: def __init__(self, keyword, title, reply, account_name): """ :rtype: object """ # 搜索关键词 self.keyword = keyword self.title = title self.reply = reply self.account_name = account_name def to_dict(self): return { 'keyword': self.keyword, 'title': self.title, 'reply': self.reply, 'accountName': self.account_name }