||
- import json
- from cubic.cubic_simple_action import cubic_simple_action
- from dongchedi.simple_comment import simple_comment
- from douyin.douyin_officail_account_interact import douyin_random_watch_video
- from kuaishou.gifmaker_random_watch_video import gifmaker_random_watch_video
- from tools import loggerKit
- from strategy.rpa_enum import media_type, task_sub_type
- from douyin.douyin_random_search import douyin_random_search
- from douyin.simple_browser_video import douyin_spider as douyin_sign_browser_video
- from douyin.douyin_sign import douyin_spider as douyin_sign
- from douyin.douyin_watch_advise import douyin_spider as douyin_watch_advise
- from douyin.douyin_boxes_advise import douyin_spider as douyin_boxes_advise
- from douyin.douyin_user_operate import douyin_spider as douyin_user_demo
- from douyin.douyin_train_home_comment import douyin_spider as douyin_train_home_comment
- from douyin.douyin_interaction_urgent import douyin_spider as douyin_interact_urgent
- from douyin.douyin_live_comment import douyin_spider as douyin_live_comment
- from douyin.douyin_train_home_page import douyin_spider as douyin_train_home_page
- from dongchedi.random_browse import random_browse as dongchedi_content_interaction
- from dongchedi.dongchedi_sign import dongchedi_sign as dongchedi_sign
- from toutiao.search_double import search_task as search_task
- from toutiao.search_random import search_random_task as search_random_task
- from toutiao.watch_novel import watch_novel_task as watch_novel_task
- from toutiao.meal_allowance import meal_allowance_task as meal_allowance_task
- from xiaohongshu.xingin_random_watch_video import xingin_random_watch_video
- from yiche.yiche_direction_action import yiche_direction_action
- # 定义策略过程
- class rpa_processor_strategy:
- def process_rpa(self, task):
- pass
- # 抖音看视频领金币操作
- class douyin_browser_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行抖音demo脚本操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_sign_browser_video(task.device_id, task.task_id, task.resource_name, task.media_channel,
- task.request_data)
- # 抖音签到操作
- class douyin_sign_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行抖音demo脚本操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_sign(task.device_id, task.execute_robot_name, task.task_id, task.resource_name,
- task.media_channel, task.request_data)
- # 抖音看广告操作
- class douyin_watch_advise_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行抖音demo脚本操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_watch_advise(task.device_id, task.task_id, task.resource_name, task.media_channel,
- task.request_data)
- # 抖音开宝箱+看广告操作
- class douyin_boxes_advise_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行抖音demo脚本操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_boxes_advise(task.device_id, task.execute_robot_name, task.task_id,
- task.resource_name, task.media_channel, task.request_data)
- # 懂车帝随机养号
- class dongchedi_content_interaction_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝养号脚本操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return dongchedi_content_interaction(task.task_id, task.execute_robot_name, task.request_data)
- # 头条搜一搜赚钱任务
- class toutiao_search_task_interaction_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行头条搜一搜赚钱脚本操作,请求信息:{2}", task.task_id,
- task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return search_task(task.task_id, task.request_data)
- # 头条随机搜赚钱任务
- class toutiao_search_random_task_interaction_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行头条搜一搜赚钱脚本操作,请求信息:{2}", task.task_id,
- task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return search_random_task(task.task_id, task.request_data)
- # 头条看小说赚钱任务
- class toutiao_watch_novel_task_interaction_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行头条看小说赚钱脚本操作,请求信息:{2}", task.task_id,
- task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return watch_novel_task(task.task_id, task.request_data)
- # 头条吃饭补贴任务
- class toutiao_meal_allwance_task_interaction_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行头条吃饭补贴脚本操作,请求信息:{2}", task.task_id,
- task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return meal_allowance_task(task.task_id, task.request_data)
- # 抖音用户demo任务
- class douyin_user_demo_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行抖音搜一搜赚钱脚本操作,请求信息:{2}", task.task_id,
- task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_user_demo(task.task_id, task.resource_name, task.request_data)
- """
- 抖音极速版
- 随机搜索
- """
- class douyin_random_search_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行抖音随机搜索操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_random_search(task.task_id, task.resource_name, task.request_data)
- # 懂车帝签到操作
- class dongchedi_sign_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return dongchedi_sign(task.task_id, task.resource_name, task.request_data)
- """
- 懂车帝简单互动
- """
- class dongchedi_simple_comment(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝简单互动操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return simple_comment(task.task_id, task.resource_name, task.request_data)
- """
- 快手简单养号
- """
- class kuaishou_random_watch_video_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行快手简单养号操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return gifmaker_random_watch_video(task.task_id, task.resource_name, task.request_data)
- """
- 小红书简单养号
- """
- class xiaohongshu_random_watch_video_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行快手简单养号操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return xingin_random_watch_video(task.task_id, task.resource_name, task.request_data)
- """
- 易车官媒
- """
- class yiche_direction_action_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行易车官媒操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return yiche_direction_action(task.task_id, task.resource_name, task.request_data)
- """
- 汽车之家官媒
- """
- class cubic_simple_action_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行汽车之家官媒操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return cubic_simple_action(task.task_id, task.resource_name, task.request_data)
- """
- 抖音常规版官媒
- """
- class douyin_random_watch_video_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行快手简单养号操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_random_watch_video(task.task_id, task.resource_name, task.request_data)
- # 抖音养号首页(浏览30 - 50视频随机时长和点赞收藏)
- class douyin_train_home_page_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_train_home_page(task.task_id, task.resource_name, task.request_data)
- # 抖音养号首页帖子评论(浏览30-50帖子 随机时长 对其评论进行观看)
- class douyin_train_home_comment_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_train_home_comment(task.task_id, task.resource_name, task.request_data)
- # 抖音紧急互动
- class douyin_urgent_interact_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_interact_urgent(task.task_id, task.resource_name, task.request_data)
- # 抖音紧急互动
- class douyin_live_operation(rpa_processor_strategy):
- def process_rpa(self, task):
- loggerKit.info("taskId:{0},设备号:{1},开始进行懂车帝签到操作,请求信息:{2}", task.task_id, task.device_id,
- json.dumps(task.to_dict(), ensure_ascii=False))
- return douyin_live_comment(task.task_id, task.resource_name, task.request_data)
- # 定义上下文类
- class rpa_processor:
- def __init__(self, strategy):
- self.strategy = strategy
- def set_strategy(self, strategy):
- self.strategy = strategy
- def process_rpa(self, task):
- return self.strategy.process_rpa(task)
- class rpa_handler:
- @staticmethod
- def set_processor(task):
- loggerKit.info(
- 'media_channel:{0}, action_type:{1},answer_type:{2}, task_type:{3} task_sub_type:{4} ,task_id:{5}',
- task.media_channel, task.action_type,
- task.answer_type, task.task_type, task.task_sub_type, task.task_id)
- default_strategy = None # 默认策略类
- # 添加其他映射关系
- strategy_mapping = {
- (media_type.douyin.value, task_sub_type.douyin_sign_browser_video.value): douyin_browser_operation(),
- (media_type.douyin.value, task_sub_type.douyin_sign.value): douyin_sign_operation(),
- (media_type.douyin.value, task_sub_type.douyin_watch_advise.value): douyin_watch_advise_operation(),
- (media_type.douyin.value, task_sub_type.douyin_boxes_advise.value): douyin_boxes_advise_operation(),
- (media_type.dongchedi.value,
- task_sub_type.content_interaction.value): dongchedi_content_interaction_operation(),
- (media_type.toutiao.value,
- task_sub_type.toutiao_search_task.value): toutiao_search_task_interaction_operation(),
- (media_type.toutiao.value,
- task_sub_type.toutiao_search_random_task.value): toutiao_search_random_task_interaction_operation(),
- (media_type.toutiao.value,
- task_sub_type.toutiao_watch_novel_task.value): toutiao_watch_novel_task_interaction_operation(),
- (media_type.toutiao.value,
- task_sub_type.toutiao_meal_allowance_task.value): toutiao_meal_allwance_task_interaction_operation(),
- (media_type.douyin.value, task_sub_type.douyin_user_demo.value): douyin_user_demo_operation(),
- (media_type.dongchedi.value, task_sub_type.dongchedi_sign.value): dongchedi_sign_operation(),
- (media_type.douyin.value, task_sub_type.douyin_random_search.value): douyin_random_search_operation(),
- (media_type.douyin.value,
- task_sub_type.douyin_train_home_comment.value): douyin_train_home_comment_operation(),
- (media_type.douyin.value, task_sub_type.douyin_train_home_page.value): douyin_train_home_page_operation(),
- (media_type.douyin.value, task_sub_type.douyin_urgent_interact.value): douyin_urgent_interact_operation(),
- (media_type.dongchedi.value, task_sub_type.dongchedi_comment.value): dongchedi_simple_comment(),
- (media_type.kuaishou.value, task_sub_type.kuaishou_train_home.value): kuaishou_random_watch_video_operation(),
- (media_type.xiaohongshu.value,
- task_sub_type.xiaohongshu.xiaohongshu_train_home.value): xiaohongshu_random_watch_video_operation(),
- (media_type.douyin.value, task_sub_type.douyin_office_media_operator.value): douyin_random_watch_video_operation(),
- (media_type.yiche.value, task_sub_type.yi_che_office_media_operator.value): yiche_direction_action_operation(),
- (media_type.auto_home.value, task_sub_type.auto_home_office_media_operator.value): cubic_simple_action_operation(),
- (media_type.douyin.value, task_sub_type.douyin_live_media_Interaction.value): douyin_live_operation()
- }
- # 根据task.action_type和task.answer_type获取相应的策略类
- strategy = strategy_mapping.get((task.media_channel, task.task_sub_type), default_strategy)
- if strategy is None:
- # 处理没有匹配到任何策略类的情况
- raise ValueError("No strategy found for the given task")
- we_strategy = rpa_processor(strategy)
- return we_strategy
|