import json from cubic.cubic_simple_action import cubic_simple_action from dongchedi.simple_comment import simple_comment from douyin.douyin_officail_account_interact import douyin_random_watch_video from kuaishou.gifmaker_random_watch_video import gifmaker_random_watch_video from tools import loggerKit from strategy.rpa_enum import media_type, task_sub_type from douyin.douyin_random_search import douyin_random_search from douyin.simple_browser_video import douyin_spider as douyin_sign_browser_video from douyin.douyin_sign import douyin_spider as douyin_sign from douyin.douyin_watch_advise import douyin_spider as douyin_watch_advise from douyin.douyin_boxes_advise import douyin_spider as douyin_boxes_advise from douyin.douyin_user_operate import douyin_spider as douyin_user_demo from douyin.douyin_train_home_comment import douyin_spider as douyin_train_home_comment from douyin.douyin_interaction_urgent import douyin_spider as douyin_interact_urgent from douyin.douyin_live_comment import douyin_spider as douyin_live_comment from douyin.douyin_train_home_page import douyin_spider as douyin_train_home_page from dongchedi.random_browse import random_browse as dongchedi_content_interaction from dongchedi.dongchedi_sign import dongchedi_sign as dongchedi_sign from toutiao.search_double import search_task as search_task from toutiao.search_random import search_random_task as search_random_task from toutiao.watch_novel import watch_novel_task as watch_novel_task from toutiao.meal_allowance import meal_allowance_task as meal_allowance_task from xiaohongshu.xingin_random_watch_video import xingin_random_watch_video from yiche.yiche_direction_action import yiche_direction_action # 定义策略过程 class rpa_processor_strategy: def process_rpa(self, task): pass # 抖音看视频领金币操作 class douyin_browser_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行抖音demo脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_sign_browser_video(task.device_id, task.task_id, task.resource_name, task.media_channel, task.request_data) # 抖音签到操作 class douyin_sign_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行抖音demo脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_sign(task.device_id, task.execute_robot_name, task.task_id, task.resource_name, task.media_channel, task.request_data) # 抖音看广告操作 class douyin_watch_advise_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行抖音demo脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_watch_advise(task.device_id, task.task_id, task.resource_name, task.media_channel, task.request_data) # 抖音开宝箱+看广告操作 class douyin_boxes_advise_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行抖音demo脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_boxes_advise(task.device_id, task.execute_robot_name, task.task_id, task.resource_name, task.media_channel, task.request_data) # 懂车帝随机养号 class dongchedi_content_interaction_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝养号脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return dongchedi_content_interaction(task.task_id, task.execute_robot_name, task.request_data) # 头条搜一搜赚钱任务 class toutiao_search_task_interaction_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行头条搜一搜赚钱脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return search_task(task.task_id, task.request_data) # 头条随机搜赚钱任务 class toutiao_search_random_task_interaction_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行头条搜一搜赚钱脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return search_random_task(task.task_id, task.request_data) # 头条看小说赚钱任务 class toutiao_watch_novel_task_interaction_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行头条看小说赚钱脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return watch_novel_task(task.task_id, task.request_data) # 头条吃饭补贴任务 class toutiao_meal_allwance_task_interaction_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行头条吃饭补贴脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return meal_allowance_task(task.task_id, task.request_data) # 抖音用户demo任务 class douyin_user_demo_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行抖音搜一搜赚钱脚本操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_user_demo(task.task_id, task.resource_name, task.request_data) """ 抖音极速版 随机搜索 """ class douyin_random_search_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行抖音随机搜索操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_random_search(task.task_id, task.resource_name, task.request_data) # 懂车帝签到操作 class dongchedi_sign_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return dongchedi_sign(task.task_id, task.resource_name, task.request_data) """ 懂车帝简单互动 """ class dongchedi_simple_comment(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝简单互动操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return simple_comment(task.task_id, task.resource_name, task.request_data) """ 快手简单养号 """ class kuaishou_random_watch_video_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行快手简单养号操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return gifmaker_random_watch_video(task.task_id, task.resource_name, task.request_data) """ 小红书简单养号 """ class xiaohongshu_random_watch_video_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行快手简单养号操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return xingin_random_watch_video(task.task_id, task.resource_name, task.request_data) """ 易车官媒 """ class yiche_direction_action_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行易车官媒操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return yiche_direction_action(task.task_id, task.resource_name, task.request_data) """ 汽车之家官媒 """ class cubic_simple_action_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行汽车之家官媒操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return cubic_simple_action(task.task_id, task.resource_name, task.request_data) """ 抖音常规版官媒 """ class douyin_random_watch_video_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行快手简单养号操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_random_watch_video(task.task_id, task.resource_name, task.request_data) # 抖音养号首页(浏览30 - 50视频随机时长和点赞收藏) class douyin_train_home_page_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_train_home_page(task.task_id, task.resource_name, task.request_data) # 抖音养号首页帖子评论(浏览30-50帖子 随机时长 对其评论进行观看) class douyin_train_home_comment_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_train_home_comment(task.task_id, task.resource_name, task.request_data) # 抖音紧急互动 class douyin_urgent_interact_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_interact_urgent(task.task_id, task.resource_name, task.request_data) # 抖音紧急互动 class douyin_live_operation(rpa_processor_strategy): def process_rpa(self, task): loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id, json.dumps(task.to_dict(), ensure_ascii=False)) return douyin_live_comment(task.task_id, task.resource_name, task.request_data) # 定义上下文类 class rpa_processor: def __init__(self, strategy): self.strategy = strategy def set_strategy(self, strategy): self.strategy = strategy def process_rpa(self, task): return self.strategy.process_rpa(task) class rpa_handler: @staticmethod def set_processor(task): loggerKit.info( 'media_channel:{0}, action_type:{1},answer_type:{2}, task_type:{3} task_sub_type:{4} ,task_id:{5}', task.media_channel, task.action_type, task.answer_type, task.task_type, task.task_sub_type, task.task_id) default_strategy = None # 默认策略类 # 添加其他映射关系 strategy_mapping = { (media_type.douyin.value, task_sub_type.douyin_sign_browser_video.value): douyin_browser_operation(), (media_type.douyin.value, task_sub_type.douyin_sign.value): douyin_sign_operation(), (media_type.douyin.value, task_sub_type.douyin_watch_advise.value): douyin_watch_advise_operation(), (media_type.douyin.value, task_sub_type.douyin_boxes_advise.value): douyin_boxes_advise_operation(), (media_type.dongchedi.value, task_sub_type.content_interaction.value): dongchedi_content_interaction_operation(), (media_type.toutiao.value, task_sub_type.toutiao_search_task.value): toutiao_search_task_interaction_operation(), (media_type.toutiao.value, task_sub_type.toutiao_search_random_task.value): toutiao_search_random_task_interaction_operation(), (media_type.toutiao.value, task_sub_type.toutiao_watch_novel_task.value): toutiao_watch_novel_task_interaction_operation(), (media_type.toutiao.value, task_sub_type.toutiao_meal_allowance_task.value): toutiao_meal_allwance_task_interaction_operation(), (media_type.douyin.value, task_sub_type.douyin_user_demo.value): douyin_user_demo_operation(), (media_type.dongchedi.value, task_sub_type.dongchedi_sign.value): dongchedi_sign_operation(), (media_type.douyin.value, task_sub_type.douyin_random_search.value): douyin_random_search_operation(), (media_type.douyin.value, task_sub_type.douyin_train_home_comment.value): douyin_train_home_comment_operation(), (media_type.douyin.value, task_sub_type.douyin_train_home_page.value): douyin_train_home_page_operation(), (media_type.douyin.value, task_sub_type.douyin_urgent_interact.value): douyin_urgent_interact_operation(), (media_type.dongchedi.value, task_sub_type.dongchedi_comment.value): dongchedi_simple_comment(), (media_type.kuaishou.value, task_sub_type.kuaishou_train_home.value): kuaishou_random_watch_video_operation(), (media_type.xiaohongshu.value, task_sub_type.xiaohongshu.xiaohongshu_train_home.value): xiaohongshu_random_watch_video_operation(), (media_type.douyin.value, task_sub_type.douyin_office_media_operator.value): douyin_random_watch_video_operation(), (media_type.yiche.value, task_sub_type.yi_che_office_media_operator.value): yiche_direction_action_operation(), (media_type.auto_home.value, task_sub_type.auto_home_office_media_operator.value): cubic_simple_action_operation(), (media_type.douyin.value, task_sub_type.douyin_live_media_Interaction.value): douyin_live_operation() } # 根据task.action_type和task.answer_type获取相应的策略类 strategy = strategy_mapping.get((task.media_channel, task.task_sub_type), default_strategy) if strategy is None: # 处理没有匹配到任何策略类的情况 raise ValueError("No strategy found for the given task") we_strategy = rpa_processor(strategy) return we_strategy