# 抖音评论正负向判断、点赞 import time from func.action_func import del_key_vague from task.task_job import callback_task from tools import loggerKit, redis_client from scene.oprator.atom_data import start_app, stop_app, get_text_by_control, single_click_by_control, \ send_text_by_control, single_click_by_text, swipe_screen, spider_content_parent_find_click, \ get_text_by_control_comment import yaml import json import httpx with open('config.yaml', 'r') as file: config = yaml.load(file, Loader=yaml.FullLoader) extern_domain = config['bmp-cp']['extern_domain'] # 任务执行回调url task_callback_url = extern_domain + config['bmp-cp']['task_callback_url'] # 判断好评url comment_direction_url = extern_domain + config['bmp-content-center']['comment_direction_url'] # 评论成功记录url user_operate_add_url = extern_domain + config['bmp-cp']['user_operate_add_url'] # 判断是否有该评论url user_operate_is_like_url = extern_domain + config['bmp-cp']['user_operate_is_like_url'] # 抖音互动 (抖音评论正负向判断、点赞) # version 29.5.0 def douyin_spider(task_id, keyword, data): loggerKit.info('请求信息:{0}'.format(data)) device_id = data.get("deviceID") perform_action_id = data.get("performActionId") result = data.get("result") extend_info = data['data']['extendInfo'] extend_info = json.loads(extend_info) keyword = extend_info['url'] if result is not None: """ 非首个指令 """ perform_action_result = result.get("performActionResult") if perform_action_result is None: return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } # 回调任务中心 del_key_vague(device_id) callback_task(500, '指令执行失败', task_id, device_id, 0, None) return return_dict if perform_action_result == "failure": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is not success" } del_key_vague(device_id) callback_task(500, '任务执行失败,可能当前无法再次执行看广告任务', task_id, device_id, 0, None) return return_dict if perform_action_result == "stop": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is not success" } del_key_vague(device_id) callback_task(500, '指令被用户终止', task_id, device_id, 0, None) return return_dict # 每次操作完成后会将对应的操作唯一id存储到redis,并且返回给手机端 手机端下次带着上个操作id来执行下一个操作 last_action_id = redis_client.get(device_id + "operate") step0 = redis_client.get(f"{device_id}_step0") if (step0 is not None and int(step0) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id)): action1_id = int(round(time.time() * 1000)) """ 发送第1条指令 点击:我(菜单) """ action1_dict = single_click_by_text(action1_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", text="我") redis_client.set(device_id + "operate", action1_id) redis_client.set(f"{device_id}_step_me", "1") redis_client.delete(f"{device_id}_step0") loggerKit.info("taskId:{0}, action1_id:{1}", task_id, action1_id) return action1_dict # 获取到对应抖音账号并存入redis: step1 = redis_client.get(f"{device_id}_step_me") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action2_id = int(round(time.time() * 1000)) """ 插播指令 获取当前的抖音账号 获取当前抖音账号 """ action2_dict = get_text_by_control(action2_id, control_id="com.ss.android.ugc.aweme.lite:id/ib7", target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", sleep_time=3) redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step_get_account", "1") redis_client.delete(f"{device_id}_step_me") loggerKit.info("taskId:{0}, action2_id:{1}", task_id, action2_id) return action2_dict step1 = redis_client.get(f"{device_id}_step_get_account") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action2_id = int(round(time.time() * 1000)) # 获取当前登陆账号并存入缓存 account_name = result.get("performActionText") if account_name is None: # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "未找到签到指示" } del_key_vague(device_id) callback_task(500, '未获取到账号信息', task_id, device_id, 0, None) return return_dict # 将当前设备操作的账号存入redis redis_client.set(device_id + 'account', account_name) """ 发送第2条指令 点击:首页 """ action2_dict = single_click_by_text(action2_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", text="首页") redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step_search", "1") redis_client.delete(f"{device_id}_step_get_account") loggerKit.info("taskId:{0}, action2_id:{1}", task_id, action2_id) return action2_dict step0 = redis_client.get(f"{device_id}_step_search") if step0 is not None and int(step0) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '回到首页失败', task_id, device_id, 0, None) return return_dict action1_id = int(round(time.time() * 1000)) """ 发送第1条指令 点击搜索框 """ action1_dict = single_click_by_control(action1_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/dxw", timeout=5) redis_client.set(device_id + "operate", action1_id) redis_client.set(f"{device_id}_step1", "1") redis_client.delete(f"{device_id}_step_search") loggerKit.info("设备:{0}, action1_id:{1}", device_id, action1_id) return action1_dict step1 = redis_client.get(f"{device_id}_step1") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '点击搜索按钮失败', task_id, device_id, 0, None) return return_dict action2_id = int(round(time.time() * 1000)) """ 发送第2条指令 对搜索框进行赋值 """ action2_dict = send_text_by_control(action2_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/et_search_kw", content=keyword, timeout=5) redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step2", "1") redis_client.delete(f"{device_id}_step1") loggerKit.info("设备:{0}, action2_id:{1}", device_id, action2_id) return action2_dict step2 = redis_client.get(f"{device_id}_step2") if step2 is not None and int(step2) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '搜索框赋值失败', task_id, device_id, 0, None) return return_dict action3_id = int(round(time.time() * 1000)) """ 发送第3条指令 点击搜索 """ action3_dict = single_click_by_control(action3_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/nou", timeout=5) redis_client.set(device_id + "operate", action3_id) redis_client.set(f"{device_id}_step3", "1") redis_client.delete(f"{device_id}_step2") loggerKit.info("设备:{0}, action2_id:{1}", device_id, action3_id) return action3_dict step3 = redis_client.get(f"{device_id}_step3") if step3 is not None and int(step3) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '搜索失败', task_id, device_id, 0, None) return return_dict action4_id = int(round(time.time() * 1000)) """ 发送第4条指令 打开评论区 """ action3_dict = single_click_by_control(action4_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/bfw", timeout=5) redis_client.set(device_id + "operate", action4_id) redis_client.set(f"{device_id}_step5", "1") redis_client.delete(f"{device_id}_step3") loggerKit.info("设备:{0}, action2_id:{1}", device_id, action4_id) return action3_dict step5 = redis_client.get(f"{device_id}_step5") if step5 is not None and int(step5) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '打开评论框失败', task_id, device_id, 0, None) return return_dict action5_id = int(round(time.time() * 1000)) """ 发送第6条指令 放大评论区 """ action5_dict = single_click_by_control(action5_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/d+4", timeout=5) redis_client.set(device_id + "operate", action5_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step5") loggerKit.info("设备:{0}, action2_id:{1}", device_id, action5_id) return action5_dict step6 = redis_client.get(f"{device_id}_step6") if step6 is not None and int(step6) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '放大评论区/点赞评论失败', task_id, device_id, 0, None) return return_dict action7_id = int(round(time.time() * 1000)) """ 发送第7条指令 1.判断是否已经执行过20次,如果超过20次 结束任务 如果没有超过20次获取评论信息 任务继续 2.评论下标记录 """ finished_count = redis_client.get(device_id + 'finish_comment') if finished_count is not None and int(finished_count) >= 20: """ 停止指令 停止app """ action7_dict = stop_app(action7_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", timeout=5) del_key_vague(device_id) loggerKit.info("设备:{0}, action11_id:{1}", device_id, action7_id) # 回调任务中心修改任务状态 callback_task(None, None, task_id, device_id, 1, None) return action7_dict # 对应评论下标 comment_index = redis_client.incr(device_id + 'comment_index', 1) # 获取评论内容 action7_dict = get_text_by_control_comment(action7_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/content", timeout=5, item_index=comment_index - 1, sleep_time=5) redis_client.set(device_id + "operate", action7_id) redis_client.set(f"{device_id}_step7", "1") redis_client.delete(f"{device_id}_step6") loggerKit.info("设备:{0}, action7_id:{1}", device_id, action7_id) return action7_dict step7 = redis_client.get(f"{device_id}_step7") if step7 is not None and int(step7) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, action7_id_mem:{1}", device_id, int(last_action_id)) action8_id = int(round(time.time() * 1000)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 action7_dict = stop_app(action8_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", timeout=5) del_key_vague(device_id) callback_task(500, '获取评论信息失败', task_id, device_id, 0, None) return action7_dict # 下标越界 if perform_action_result == "indexOutOfBounds": # 判断是否有 '暂时没有更多了' 文案 action8_dict = single_click_by_text(action8_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", text="暂时没有更多了", sleep_time=5) redis_client.set(device_id + "operate", action8_id) redis_client.delete(f"{device_id}_step7") redis_client.set(f"{device_id}_swipe_comment", "1") return action8_dict # 获取到对应的评论文案 result_text = result.get("performActionText") # 如果获取到的评论文案是空的 再次获取 if result_text is None or result_text == '': action_dict = get_text_by_control_comment(action8_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/content", timeout=5, item_index=0) redis_client.set(device_id + "operate", action8_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step7") return action_dict """ 发送第8条指令 判断该评论是否是正向评论 """ # 调用后台接口,判断该账号是否已经对对应帖子评论点赞过 account_name = redis_client.get(device_id + 'account') judge_params = {"fromUser": account_name, "resource": keyword, "subResource": result_text} judge_response = httpx.get(user_operate_is_like_url, params=judge_params) loggerKit.info('taskId:{0}judge_comment_api:{1},返回结果:{2}', task_id, judge_params, judge_response.text) # 是否未操作过 if judge_response.is_success: judge_body = json.loads(judge_response.text) judge_data = judge_body.get('data') if int(judge_data) == 1: loggerKit.info('taskId:{0}judge_comment_api已经点赞过:{1}', task_id, judge_params) operated = True else: operated = False else: loggerKit.info('taskId:{0}judge_comment_api是否点赞接口调用失败:{1}', task_id, judge_params) operated = True if operated: loggerKit.info("taskId:{0},对应评论为:{1}已经可能点赞过了", task_id, judge_params) action_dict = get_text_by_control_comment(action8_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/content", timeout=5, item_index=0, sleep_time=5) redis_client.set(device_id + "operate", action8_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step7") redis_client.incr(device_id + 'finish_comment', 1) return action_dict # 如果未点赞过 判断该评论正负向 # 是否为正向评论 direction_params = { "comment": result_text.replace('首评', '') } direction_response = httpx.post(comment_direction_url, json=direction_params, timeout=120) loggerKit.info('taskId:{0}判断正负向接口调用:{1},返回结果:{2}', task_id, direction_params, direction_response.text) if direction_response.is_success: direction_data = json.loads(direction_response.text) data = direction_data.get('data') direction = data.get('direction') if direction == 'positive': need_operate = True else: need_operate = False else: loggerKit.info('taskId:{0}判断正负向接口调用失败:{1}', task_id, direction_params) need_operate = False if not need_operate: loggerKit.info("taskId:{0},对应评论为:{1}不需要点赞", task_id, result_text) action_dict = get_text_by_control_comment(action8_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/content", timeout=5, item_index=0, sleep_time=5) redis_client.set(device_id + "operate", action8_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step7") redis_client.incr(device_id + 'finish_comment', 1) return action_dict action5_dict = spider_content_parent_find_click(action8_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", title=result_text, child_control_id="com.ss.android.ugc.aweme.lite:id/cth", item_index=3, timeout=5, sleep_time=5) redis_client.set(device_id + "operate", action8_id) redis_client.set(f"{device_id}_step8", "1") redis_client.delete(f"{device_id}_step7") # 将当前评论存入缓存中 redis_client.set(f"{device_id}_comment", result_text) loggerKit.info("设备:{0}, action2_id:{1}", device_id, action8_id) return action5_dict step_swipe_comment = redis_client.get(f"{device_id}_swipe_comment") if step_swipe_comment is not None and int(step_swipe_comment) == 1 and last_action_id is not None and int( perform_action_id) == int(last_action_id): loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id)) action_swipe_id = int(round(time.time() * 1000)) # 元素未找到 代表没有 '暂时没有更多了' 继续下滑, 且对下标记录重置从0开始 if perform_action_result == "invalid operation": action5_dict = swipe_screen(action_swipe_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", timeout=5, duration=2000, sleep_time=5) redis_client.set(device_id + "operate", action_swipe_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_swipe_comment") loggerKit.info("设备:{0}, action2_id:{1}", device_id, action_swipe_id) redis_client.delete(device_id + 'comment_index') return action5_dict # 没有更多评论 回调任务中心 stop_app(action_swipe_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", timeout=5) del_key_vague(device_id) callback_task(None, None, task_id, device_id, 1, None) return stop_app step8 = redis_client.get(f"{device_id}_step8") if step8 is not None and int(step8) == 1 and last_action_id is not None and int( perform_action_id) == int(last_action_id): loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id)) step9 = int(round(time.time() * 1000)) # 元素未找到 代表没有点赞成功,继续获取下一个评论 if perform_action_result == "invalid operation": action_dict = get_text_by_control_comment(step9, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/content", timeout=5, item_index=0, sleep_time=5) redis_client.set(device_id + "operate", step9) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step8") return action_dict # 点赞成功,评论点赞完成次数+1 且调用已完成评论接口 action_dict = get_text_by_control_comment(step9, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/content", timeout=5, item_index=0, sleep_time=5) redis_client.set(device_id + "operate", step9) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step8") account_name = redis_client.get(device_id + 'account') comment = redis_client.get(f"{device_id}_comment") operate_add_params = { "fromUser": account_name, "resource": keyword, "subResource": comment } operate_add_response = httpx.post(user_operate_add_url, json=operate_add_params, timeout=120) loggerKit.info('taskId:{0}评论成功接口调用:{1},返回结果:{2}', task_id, operate_add_params, operate_add_response.text) redis_client.incr(device_id + 'finish_comment', 1) return action_dict else: action0_id = int(round(time.time() * 1000)) """ 启动指令 启动app """ action0_dict = start_app(action0_id, target_app="douyin", target_version="29.5.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action0_id) redis_client.set(f"{device_id}_step0", "1") redis_client.delete(f"{device_id}_step9") loggerKit.info("设备:{0}, action0_id:{1}", device_id, action0_id) return action0_dict # 批量模糊删除缓存 # def del_key_vague(device_id): # # 批量模糊删除keys # keys = redis_client.match_pattern_prefix(device_id) # if len(keys) > 0: # # 需要判断是否有匹配的值, 没有的话会报错 # for key in keys: # redis_client.delete(key) # loggerKit.info(f"clear {device_id} keys success...") # else: # loggerKit.info(f"{device_id} keys none ...")