# 抖音直播间评论脚本 import random import time from func.action_func import del_key_vague from task.task_job import callback_task from tools import loggerKit, redis_client from scene.oprator.atom_data import start_app, stop_app, get_text_by_control, single_click_by_control, \ send_text_by_control, many_click_by_position, single_click_by_text import yaml import json import httpx with open('config.yaml', 'r') as file: config = yaml.load(file, Loader=yaml.FullLoader) extern_domain = config['bmp-cp']['extern_domain'] # 任务执行回调url task_callback_url = extern_domain + config['bmp-cp']['task_callback_url'] # 调用AIGC接口 post_ai_gc_url = extern_domain + config['bmp-content-center']['comment_local_url'] # 抖音互动 (根据视频链接 进行搜索获取到对应的视频进行点赞 收藏 评论) # version 29.5.0 def douyin_spider(task_id, keyword, data): loggerKit.info('请求信息:{0}'.format(data)) device_id = data.get("deviceID") perform_action_id = data.get("performActionId") result = data.get("result") extension_info = data['data']['extendInfo'] extension_info = json.loads(extension_info) keyword = extension_info['liveURL'] if result is not None: """ 非首个指令 """ perform_action_result = result.get("performActionResult") if perform_action_result is None: return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } # 回调任务中心 del_key_vague(device_id) callback_task(500, '指令执行失败', task_id, device_id, 0, None) return return_dict if perform_action_result == "failure": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is not success" } del_key_vague(device_id) callback_task(500, '任务执行失败,可能当前无法再次执行看广告任务', task_id, device_id, 0, None) return return_dict if perform_action_result == "stop": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is not success" } del_key_vague(device_id) callback_task(500, '指令被用户终止', task_id, device_id, 0, None) return return_dict # 每次操作完成后会将对应的操作唯一id存储到redis,并且返回给手机端 手机端下次带着上个操作id来执行下一个操作 last_action_id = redis_client.get(device_id + "operate") step0 = redis_client.get(f"{device_id}_step0") if (step0 is not None and int(step0) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) and perform_action_result == "success"): action1_id = int(round(time.time() * 1000)) """ 发送第1条指令 点击:我(菜单) """ action1_dict = single_click_by_text(action1_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", text="我") redis_client.set(device_id + "operate", action1_id) redis_client.set(f"{device_id}_step_me", "1") redis_client.delete(f"{device_id}_step0") loggerKit.info("taskId:{0}, action1_id:{1}", task_id, action1_id) return action1_dict # 获取到对应抖音账号并存入redis: step1 = redis_client.get(f"{device_id}_step_me") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action2_id = int(round(time.time() * 1000)) """ 插播指令 获取当前的抖音账号 获取当前抖音账号 """ action2_dict = get_text_by_control(action2_id, control_id="com.ss.android.ugc.aweme.lite:id/ib7", target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", sleep_time=3) redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step_get_account", "1") redis_client.delete(f"{device_id}_step_me") loggerKit.info("taskId:{0}, action2_id:{1}", task_id, action2_id) return action2_dict step1 = redis_client.get(f"{device_id}_step_get_account") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, last_action_id:{1}", device_id, int(last_action_id)) action2_id = int(round(time.time() * 1000)) # 获取当前登陆账号并存入缓存 account_name = result.get("performActionText") if account_name is None: # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "未获取到账号信息" } del_key_vague(device_id) callback_task(500, '未获取到账号信息', task_id, device_id, 0, None) return return_dict # 将当前设备操作的账号存入redis redis_client.set(device_id + 'account', account_name) """ 发送第2条指令 点击:首页 """ action2_dict = single_click_by_text(action2_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", text="首页") redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step_search", "1") redis_client.delete(f"{device_id}_step_get_account") loggerKit.info("taskId:{0}, action2_id:{1}", task_id, action2_id) return action2_dict step0 = redis_client.get(f"{device_id}_step_search") if step0 is not None and int(step0) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '回到首页失败', task_id, device_id, 0, None) return return_dict action1_id = int(round(time.time() * 1000)) """ 发送第1条指令 点击搜索框 """ action1_dict = single_click_by_control(action1_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/dxw", timeout=5) redis_client.set(device_id + "operate", action1_id) redis_client.set(f"{device_id}_step1", "1") redis_client.delete(f"{device_id}_step_search") loggerKit.info("设备:{0}, action1_id:{1}", device_id, action1_id) return action1_dict step1 = redis_client.get(f"{device_id}_step1") if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) and perform_action_result == "success": loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '点击搜索按钮失败', task_id, device_id, 0, None) return return_dict action2_id = int(round(time.time() * 1000)) """ 发送第2条指令 对搜索框进行赋值 """ action2_dict = send_text_by_control(action2_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/et_search_kw", content=keyword, timeout=5) redis_client.set(device_id + "operate", action2_id) redis_client.set(f"{device_id}_step2", "1") redis_client.delete(f"{device_id}_step1") loggerKit.info("设备:{0}, action2_id:{1}", device_id, action2_id) return action2_dict step2 = redis_client.get(f"{device_id}_step2") if step2 is not None and int(step2) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) and perform_action_result == "success": loggerKit.info("设备:{0}, action3_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '搜索框赋值失败', task_id, device_id, 0, None) return return_dict action3_id = int(round(time.time() * 1000)) """ 发送第3条指令 点击搜索按钮 """ comment_count = random.randint(5, 8) redis_client.set(device_id + 'comment_count', comment_count) loggerKit.info("任务id:{0}, 评论次数:{1}", task_id, comment_count) action3_dict = single_click_by_control(action3_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="com.ss.android.ugc.aweme.lite:id/nou") redis_client.set(device_id + "operate", action3_id) redis_client.set(f"{device_id}_step3", "1") redis_client.delete(f"{device_id}_step2") loggerKit.info("设备:{0}, action3_id:{1}", device_id, action3_id) return action3_dict # 第四步 判断是否可分享 不可分享开始评论 step3 = redis_client.get(f"{device_id}_step3") if step3 is not None and int(step3) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) and perform_action_result == "success": loggerKit.info("设备:{0}, action3_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '搜索框赋值失败', task_id, device_id, 0, None) return return_dict action4_id = int(round(time.time() * 1000)) account_name = redis_client.get(device_id + 'account') # 判断该设备是否对该链接进行过转发 done_link = redis_client.set_nx(account_name + keyword, 'done') if done_link: """ 发送第4条指令 点击更多 开始分享 """ action6_dict = single_click_by_control(action4_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="m.l.live.plugin:id/more_toolbar_icon", timeout=5) redis_client.set(device_id + "operate", action4_id) redis_client.set(f"{device_id}_step4", "1") redis_client.delete(f"{device_id}_step3") loggerKit.info("设备:{0}, action7_id:{1}", device_id, action4_id) return action6_dict """ 发送第4条指令 点开直播间对应的输入框 """ action6_dict = many_click_by_position(action4_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", count=2, timeout=5) redis_client.set(device_id + "operate", action4_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step3") loggerKit.info("设备:{0}, action7_id:{1}", device_id, action4_id) return action6_dict # 第5步 # 点击分享 step4 = redis_client.get(f"{device_id}_step4") if step4 is not None and int(step4) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action6_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '点击直播间右下角更多按钮失败', task_id, device_id, 0, None) return return_dict action5_id = int(round(time.time() * 1000)) """ 发送第5条指令 点击分享 """ action6_dict = single_click_by_text(action5_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", text="分享", timeout=5) redis_client.set(device_id + "operate", action5_id) redis_client.set(f"{device_id}_step5", "1") redis_client.delete(f"{device_id}_step4") loggerKit.info("设备:{0}, action7_id:{1}", device_id, action5_id) return action6_dict # 第6步 # 点击复制链接 step4 = redis_client.get(f"{device_id}_step5") if step4 is not None and int(step4) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action6_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '点击分享失败', task_id, device_id, 0, None) return return_dict action5_id = int(round(time.time() * 1000)) """ 发送第5条指令 点击分享 """ action6_dict = single_click_by_text(action5_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", text="复制链接", timeout=5) redis_client.set(device_id + "operate", action5_id) redis_client.set(f"{device_id}_step6", "1") redis_client.delete(f"{device_id}_step5") loggerKit.info("设备:{0}, action7_id:{1}", device_id, action5_id) return action6_dict # 第7步 # 点击 step6 = redis_client.get(f"{device_id}_step6") if step6 is not None and int(step6) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action6_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '点击搜索按钮/分享链接失败', task_id, device_id, 0, None) return return_dict action6_id = int(round(time.time() * 1000)) """ 发送第7条指令 点开直播间对应的输入框 """ action6_dict = single_click_by_control(action6_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="m.l.live.plugin:id/edit_btn_audience", timeout=5) redis_client.set(device_id + "operate", action6_id) redis_client.set(f"{device_id}_step8", "1") redis_client.delete(f"{device_id}_step6") loggerKit.info("设备:{0}, action7_id:{1}", device_id, action6_id) return action6_dict step8 = redis_client.get(f"{device_id}_step8") if step8 is not None and int(step8) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action6_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '唤醒评论输入框失败', task_id, device_id, 0, None) return return_dict action9_id = int(round(time.time() * 1000)) """ 发送第9条指令 根据视频内容 生成评论文案将文案缓存 并且点击评论 如果没获取到对应的评论 任务结束 关闭 """ request_data = { "title": "", "comment": "", "platform": "douyin", "mode": "1" } loggerKit.info("任务id:{0},请求AIGC信息:{1}", task_id, request_data) response = httpx.post(post_ai_gc_url, json=request_data, timeout=120) if not response.is_success: # 调用AIGC获取评论失败 loggerKit.info("任务id:{0}对应标题:{1},请求AIGC失败信息:{2},返回信息:{3}", task_id, keyword, request_data, response) reply = '不错👍' response_body = json.loads(response.text) reply_data = response_body.get('data') reply = reply_data.get('comment') if '内容太少' in reply or '对不起' in reply or reply is None or reply == '' or '提供' in reply: # 获取到的评论内容不符合 loggerKit.info("任务id:{0}对应标题:{1},请求AIGC失败信息:{2},返回信息:{3}", task_id, keyword, request_data, response) reply = '不错👍' redis_client.set(device_id + "operate", action9_id) redis_client.set(f"{device_id}_step9", "1") redis_client.delete(f"{device_id}_step8") loggerKit.info("设备:{0}, action7_id:{1}", device_id, action9_id) # 评论框赋值 action9_dict = send_text_by_control(action9_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="m.l.live.plugin:id/edit_text_container", content=reply, timeout=5, sleep_time=5, item_index=0) return action9_dict step9 = redis_client.get(f"{device_id}_step9") if step9 is not None and int(step9) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action9_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '评论输入框赋值失败', task_id, device_id, 0, None) return return_dict action10_id = int(round(time.time() * 1000)) """ 发送第10条指令 发送评论 """ action10_dict = single_click_by_control(action10_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="m.l.live.plugin:id/input_panel_send_view", timeout=5, sleep_time=5) redis_client.set(device_id + "operate", action10_id) redis_client.set(f"{device_id}_step10", "1") redis_client.delete(f"{device_id}_step9") loggerKit.info("设备:{0}, action10_id:{1}", device_id, action10_id) return action10_dict step10 = redis_client.get(f"{device_id}_step10") if step10 is not None and int(step10) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id) \ and perform_action_result == "success": loggerKit.info("设备:{0}, action10_id_mem:{1}", device_id, int(last_action_id)) # 元素未找到 if perform_action_result == "invalid operation": # 回调任务中心 return_dict = { "data": "", "code": -2, "message": "fail, performActionResult is null" } del_key_vague(device_id) callback_task(500, '发送评论失败', task_id, device_id, 0, None) return return_dict action11_id = int(round(time.time() * 1000)) """ 发送第11条指令 点赞直播间 """ like_count = random.randint(10, 15) action11_dict = many_click_by_position(action11_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", count=like_count, timeout=5) redis_client.set(device_id + "operate", action11_id) redis_client.set(f"{device_id}_step11", "1") redis_client.delete(f"{device_id}_step10") loggerKit.info("设备:{0}, action10_id:{1}", device_id, action11_id) return action11_dict """ 停止app """ step10 = redis_client.get(f"{device_id}_step11") if step10 is not None and int(step10) == 1 and last_action_id is not None and int(perform_action_id) == int( last_action_id): loggerKit.info("设备:{0}, action10_id_mem:{1}", device_id, int(last_action_id)) # 已完成任务数 done_comment_count = redis_client.incr(device_id + "done_count", 1) # 预期完成任务数 anticipate_comment_count = redis_client.get(device_id + 'comment_count') action12_id = int(round(time.time() * 1000)) # 如果已经达到预期评论次数 结束任务 if int(done_comment_count) >= int(anticipate_comment_count): """ 停止指令 停止app """ action12_dict = stop_app(action12_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", timeout=5) del_key_vague(device_id) loggerKit.info("设备:{0}, action11_id:{1}", device_id, action12_id) # 回调任务中心修改任务状态 callback_task(None, None, task_id, device_id, 1, None) return action12_dict # 未达到评论次数 重新开始评论 """ 发送第7条指令 点开直播间对应的输入框 """ action6_dict = single_click_by_control(action12_id, target_app="douyin", target_version="28.8.0", package_name="com.ss.android.ugc.aweme.lite", control_id="m.l.live.plugin:id/edit_btn_audience", timeout=5) redis_client.set(device_id + "operate", action12_id) redis_client.set(f"{device_id}_step8", "1") redis_client.delete(f"{device_id}_step11") loggerKit.info("设备:{0}, action7_id:{1}", device_id, action12_id) return action6_dict else: action0_id = int(round(time.time() * 1000)) """ 启动指令 启动app """ action0_dict = start_app(action0_id, target_app="douyin", target_version="29.5.0", package_name="com.ss.android.ugc.aweme.lite") redis_client.set(device_id + "operate", action0_id) redis_client.set(f"{device_id}_step0", "1") redis_client.delete(f"{device_id}_step9") loggerKit.info("设备:{0}, action0_id:{1}", device_id, action0_id) return action0_dict # 批量模糊删除缓存 # def del_key_vague(device_id): # # 批量模糊删除keys # keys = redis_client.match_pattern_prefix(device_id) # if len(keys) > 0: # # 需要判断是否有匹配的值, 没有的话会报错 # for key in keys: # redis_client.delete(key) # loggerKit.info(f"clear {device_id} keys success...") # else: # loggerKit.info(f"{device_id} keys none ...")