| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- import time
- from func.action_func import del_key_vague
- from task.task_job import callback_task
- from tools import loggerKit, redis_client
- from tools.pic_base64_util import pic_to_base64
- from scene.oprator.atom_data import start_app, stop_app, continual_swipe_screen, single_click_by_control, \
- check_pic_exist, \
- click_pic, get_content_by_control, swipe_screen
- import requests
- import json
- import threading
- import yaml
- with open('config.yaml', 'r') as file:
- config = yaml.load(file, Loader=yaml.FullLoader)
- extern_domain = config['bmp-cp']['extern_domain']
- # 任务执行回调url
- task_callback_url = extern_domain + config['bmp-cp']['task_callback_url']
- # 抖音极速版看广告
- # version 28.8.0
- def douyin_spider(device_serial, task_id, keyword, media_channel, data):
- loggerKit.info('请求信息:{0}'.format(data))
- device_id = data.get("deviceID")
- perform_action_id = data.get("performActionId")
- result = data.get("result")
- if result is not None:
- """
- 非首个指令
- """
- perform_action_result = result.get("performActionResult")
- if perform_action_result is None:
- return_dict = {
- "data": "",
- "code": -2,
- "message": "fail, performActionResult is null"
- }
- # 回调任务中心
- del_key_vague(device_id)
- callback_task(500, '指令执行失败', task_id, device_id, 0, None)
- return return_dict
- if perform_action_result == "picNotFound":
- # 回调任务中心
- return_dict = {
- "data": "",
- "code": -2,
- "message": "fail, performActionResult is not success"
- }
- del_key_vague(device_id)
- callback_task(500, '任务执行失败,可能当前无法再次执行看广告任务', task_id, device_id, 0, None)
- return return_dict
- if perform_action_result != "success":
- # 回调任务中心
- return_dict = {
- "data": "",
- "code": -2,
- "message": "fail, performActionResult is not success"
- }
- del_key_vague(device_id)
- callback_task(500, '任务执行失败,可能当前无法再次执行看广告任务', task_id, device_id, 0, None)
- return return_dict
- # 每次操作完成后会将对应的操作唯一id存储到redis,并且返回给手机端 手机端下次带着上个操作id来执行下一个操作
- last_action_id = redis_client.get(device_id + "operate")
- step0 = redis_client.get(f"{device_id}_step0")
- if step0 is not None and int(step0) == 1 and last_action_id is not None and int(perform_action_id) == int(
- last_action_id) and perform_action_result == "success":
- action1_id = int(round(time.time() * 1000))
- """
- 发送第1条指令
- 点击急速版抖音赚钱按键
- """
- action1_dict = single_click_by_control(action1_id, target_app="douyin", target_version="28.8.0",
- package_name="com.ss.android.ugc.aweme.lite",
- control_id="com.ss.android.ugc.aweme.lite:id/hvy",
- control_ids="com.ss.android.ugc.aweme.lite:id/hvy,"
- "com.ss.android.ugc.aweme.lite:id/lf3")
- redis_client.set(device_id + "operate", action1_id)
- redis_client.set(f"{device_id}_step1", "1")
- redis_client.delete(f"{device_id}_step0")
- loggerKit.info("设备:{0}, action1_id:{1}", device_id, action1_id)
- return action1_dict
- step1 = redis_client.get(f"{device_id}_step1")
- # 进入看广告任务标识
- ready_advise_path = 'ready_advise.png'
- if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int(
- last_action_id) and perform_action_result == "success":
- loggerKit.info("设备:{0}, action1_id_mem:{1}", device_id, int(last_action_id))
- action2_id = int(round(time.time() * 1000))
- """
- 发送第2条指令
- 查找进入看广告标识图片 向下最多滑动三次获取
- """
- sign_ready_base64 = pic_to_base64(ready_advise_path)
- action2_dict = check_pic_exist(action2_id, target_app="douyin", target_version="28.8.0",
- package_name="com.ss.android.ugc.aweme.lite",
- pic_base64=sign_ready_base64, swipe_count=3)
- redis_client.set(device_id + "operate", action2_id)
- redis_client.set(f"{device_id}_step2", "1")
- redis_client.delete(f"{device_id}_step1")
- loggerKit.info("设备:{0}, action2_id:{1}", device_id, action2_id)
- return action2_dict
- step2 = redis_client.get(f"{device_id}_step2")
- if step2 is not None and int(step2) == 1 and last_action_id is not None and int(perform_action_id) == int(
- last_action_id) \
- and perform_action_result == "success":
- loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id))
- action3_id = int(round(time.time() * 1000))
- """
- 发送第3条指令
- 点击看广告标识 进行看广告动作
- """
- sign_ready_base64 = pic_to_base64(ready_advise_path)
- action3_dict = click_pic(action3_id, target_app="douyin", target_version="28.8.0",
- package_name="com.ss.android.ugc.aweme.lite",
- pic_base64=sign_ready_base64)
- redis_client.set(device_id + "operate", action3_id)
- redis_client.set(f"{device_id}_step3", "1")
- redis_client.delete(f"{device_id}_step2")
- loggerKit.info("设备:{0}, action3_id:{1}", device_id, action3_id)
- return action3_dict
- step3 = redis_client.get(f"{device_id}_step3")
- if step3 is not None and int(step3) == 1 and last_action_id is not None and int(perform_action_id) == int(
- last_action_id) \
- and perform_action_result == "success":
- loggerKit.info("设备:{0}, action3_id_mem:{1}", device_id, int(last_action_id))
- action4_id = int(round(time.time() * 1000))
- """
- 发送第4条指令
- 判断是否有返回元素 某些账号观看广告是下滑瀑布流(小o2024)需要进行下滑处理
- """
- action3_dict = get_content_by_control(action4_id, target_app="douyin", target_version="28.8.0",
- max_page="1",
- package_name="com.ss.android.ugc.aweme.lite", operator_type=1,
- title="返回")
- redis_client.set(device_id + "operate", action4_id)
- redis_client.set(f"{device_id}_step4", "1")
- redis_client.delete(f"{device_id}_step3")
- loggerKit.info("设备:{0}, action4_id:{1}", device_id, action4_id)
- return action3_dict
- """
- 根据返回 进行持续下滑或不操作
- """
- step4 = redis_client.get(f"{device_id}_step4")
- if step4 is not None and int(step4) == 1 and last_action_id is not None and int(perform_action_id) == int(
- last_action_id):
- loggerKit.info("设备:{0}, action3_id_mem:{1}", device_id, int(last_action_id))
- action5_id = int(round(time.time() * 1000))
- """
- 发送第4条指令
- 判断是否有返回元素 某些账号观看广告是下滑瀑布流(小o2024)需要进行下滑处理
- """
- if perform_action_result == 'eleNotFound':
- action3_dict = swipe_screen(action5_id,
- package_name="com.ss.android.ugc.aweme.lite", sleep_time=70)
- else:
- action3_dict = continual_swipe_screen(action5_id,
- package_name="com.ss.android.ugc.aweme.lite",
- continuous_time=90)
- redis_client.set(device_id + "operate", action5_id)
- redis_client.set(f"{device_id}_step5", "1")
- redis_client.delete(f"{device_id}_step4")
- loggerKit.info("设备:{0}, action5_id:{1}", device_id, action5_id)
- return action3_dict
- step5 = redis_client.get(f"{device_id}_step5")
- if step5 is not None and int(step5) == 1 and last_action_id is not None and int(perform_action_id) == int(
- last_action_id) and perform_action_result == "success":
- loggerKit.info("设备:{0}, action10_id_mem:{1}", device_id, int(last_action_id))
- action5_id = int(round(time.time() * 1000))
- """
- 停止指令
- 停止app
- """
- action5_dict = stop_app(action5_id, target_app="douyin", target_version="28.8.0",
- package_name="com.ss.android.ugc.aweme.lite")
- del_key_vague(device_id)
- loggerKit.info("设备:{0}, action12_id:{1}", device_id, action5_dict)
- # 回调任务中心修改任务状态
- callback_task(None, None, task_id, device_id, 1, None)
- return action5_dict
- else:
- action0_id = int(round(time.time() * 1000))
- """
- 启动指令
- 启动app
- """
- action0_dict = start_app(action0_id, target_app="douyin", target_version="28.8.0",
- package_name="com.ss.android.ugc.aweme.lite")
- redis_client.set(device_id + "operate", action0_id)
- redis_client.set(f"{device_id}_step0", "1")
- redis_client.delete(f"{device_id}_step9")
- loggerKit.info("设备:{0}, action0_id:{1}", device_id, action0_id)
- return action0_dict
- # 批量模糊删除缓存
- # def del_key_vague(device_id):
- # # 批量模糊删除keys
- # keys = redis_client.match_pattern_prefix(device_id)
- # if len(keys) > 0:
- # # 需要判断是否有匹配的值, 没有的话会报错
- # for key in keys:
- # redis_client.delete(key)
- # loggerKit.info(f"clear {device_id} keys success...")
- # else:
- # loggerKit.info(f"{device_id} keys none ...")
- # post请求
- # def call_back_request(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag):
- # response = {
- # "errorCode": error_code,
- # "errorMsg": error_msg,
- # "taskId": task_id,
- # "taskStatus": task_status,
- # "taskExecuteResponse": task_execute_response,
- # "content": content,
- # "demoFlag": demo_flag
- # }
- # return response
- #
- #
- # # 回调任务中心接口
- # def callback_task(err_code, err_msg, task_id, device_id, execute_status, result):
- # callback_request2 = call_back_request(err_code, err_msg, task_id, execute_status, result, None, None)
- # loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4} 。回调开始", threading.current_thread().name,
- # threading.get_ident(), task_id, device_id, task_callback_url)
- # current_timeout = (5, 10)
- # callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout)
- # loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}", threading.current_thread().name,
- # threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False))
- # return callback_response2
|