| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- import random
- import subprocess
- import time
- import uiautomator2 as u2
- from uiautomator2 import Direction
- from tools import loggerKit
- from tools.common_util import cap_device
- class shell_status_content:
- def __init__(self, execute_status, error_code, error_msg, result, content, nickname):
- """
- :rtype: object
- """
- # 0失败 1成功
- self.execute_status = execute_status
- self.error_code = error_code
- self.error_msg = error_msg
- self.result = result
- self.content = content
- self.nickname = nickname
- def to_dict(self):
- return {
- 'execute_status': self.execute_status,
- 'error_code': self.error_code,
- 'error_msg': self.error_msg,
- 'result': self.result,
- 'content': self.content,
- 'nickname': self.nickname
- }
- # 保活任务 不打开app
- def keep_alive_sub_type(device):
- cmd_conn = f"adb connect {device}"
- conn_output = subprocess.check_output(cmd_conn, shell=True)
- print(f"{conn_output}")
- d = u2.connect(device)
- d.screen_on()
- d.unlock()
- d.debug = False
- swipe_list = ['right', 'left']
- d.swipe_ext(random.choice(swipe_list), 1)
- status = shell_status_content(1, 0, '', '', '', '')
- return status
- # 懂车帝 v7.9.0
- # 懂车帝demo
- def simulated_demo_operation(device, task_id):
- cmd_conn = f"adb connect {device}"
- conn_output = subprocess.check_output(cmd_conn, shell=True)
- print(f"{conn_output}")
- d = u2.connect(device)
- d.screen_on()
- d.unlock()
- d.debug = False
- d.app_stop("com.ss.android.auto")
- d.app_start('com.ss.android.auto', use_monkey=True)
- time.sleep(2)
- d.xpath('//*[@resource-id="com.ss.android.auto:id/gxf"]').click_exists(timeout=0.25)
- swipe_screen(d)
- swipe_screen(d)
- time.sleep(1)
- # 如果弹出问答页面 处理问答页面
- # handleQuestion(d)
- # 模拟随机滑动几屏 随机浏览文章
- # randomView(d)
- # 搜索关键词
- searchKeyword(d, task_id)
- status = shell_status_content(1, 0, '', '', '', '')
- return status
- # 懂车帝 v7.9.0
- # 3-5 天的养号操作
- def simulated_operation(device, title, task_id):
- cmd_conn = f"adb connect {device}"
- conn_output = subprocess.check_output(cmd_conn, shell=True)
- loggerKit.info(f"{conn_output}")
- d = u2.connect(device)
- d.screen_on()
- d.unlock()
- d.debug = False
- try:
- d.app_stop("com.ss.android.auto")
- d.app_start('com.ss.android.auto', use_monkey=True)
- # 跳过开屏广告
- d.watcher.when("跳过广告").click()
- # 跳过升级弹框
- d.watcher.when("以后再说").click()
- # 位置授权
- d.watcher.when("本次运行允许").click()
- # 稍后
- d.watcher.when("稍后").click()
- # 开始后台监控
- # 默认监控间隔2.0s
- d.watcher.start(2.0)
- # 进入我的获取账号信息
- d.xpath('//*[@resource-id="android:id/tabs"]/android.widget.RelativeLayout[5]').click()
- time.sleep(2)
- account_name = ""
- if d.xpath('//*[@resource-id="com.ss.android.auto:id/god"]').exists:
- account_name = d.xpath('//*[@resource-id="com.ss.android.auto:id/god"]').get_text()
- # 点击到首页
- # d.xpath('//*[@resource-id="android:id/tabs"]/android.widget.RelativeLayout[1]').click()
- if d.xpath('//*[@text="首页"]').exists:
- d.xpath('//*[@text="首页"]').click()
- else:
- d.xpath('//*[@resource-id="android:id/tabs"]/android.widget.RelativeLayout[1]').click()
- time.sleep(2)
- d.xpath('//*[@resource-id="com.ss.android.auto:id/gxf"]').click_exists(timeout=0.25)
- swipe_screen(d)
- swipe_screen(d)
- time.sleep(1)
- # 如果弹出问答页面 处理问答页面
- handleQuestion(d)
- # 模拟随机滑动几屏 随机浏览文章
- randomView(d)
- time.sleep(3)
- # 搜索关键词
- searchKeyword(d, title)
- # 点击车友圈按钮
- d.xpath('//*[@text="车友圈"]').click()
- time.sleep(3)
- # 进入车友圈
- if d.xpath('//*[@text="进入车友圈"]').exists:
- d.xpath('//*[@text="进入车友圈"]').click()
- else:
- d.xpath('//*[@text="进圈"]').click()
- time.sleep(2)
- # 点击加圈
- circleTextView = d(resourceId="com.ss.android.auto:id/ief")
- if circleTextView.exists:
- text = circleTextView.get_text()
- if text == "加圈":
- circleTextView.click_exists(timeout=1)
- time.sleep(0.5)
- carFriendWindow = d(resourceId="com.ss.android.auto:id/ala")
- if carFriendWindow.exists:
- d.xpath('//*[@resource-id="com.ss.android.auto:id/bpd"]').click_exists(timeout=1)
- # 切换到精华tab
- # d.xpath('//*[@text="精华"]').click_exists(timeout=2)
- # time.sleep(0.25)
- currentNum = 0
- # 滑动几屏
- while currentNum < random.randint(3, 5):
- d.swipe_ext(Direction.FORWARD)
- currentNum += 1
- time.sleep(random.random())
- # 随机滑动点击帖子
- click = 0
- while click <= random.randint(3, 5):
- print("each = ", click)
- readPost(d)
- click += 1
- d.swipe_ext(Direction.FORWARD)
- # 切换到热门帖子
- hotTabView = d.xpath('//*[@text="热门"]')
- if hotTabView.exists:
- hotTabView.click(timeout=1)
- time.sleep(0.25)
- # 随机滑动热门列表列表 、点击帖子
- for each in range(1, 4):
- readPost(d)
- d.swipe_ext(Direction.FORWARD)
- status = shell_status_content(1, 0, '', '', '', account_name)
- d.app_stop("com.ss.android.auto")
- return status
- except Exception as e:
- cap_device(d, device, task_id)
- exception_status = shell_status_content(0, 500, str(e), '', '', '')
- d.app_stop("com.ss.android.auto")
- loggerKit.error(f"设备{device}, 任务{task_id}, dongchedi v7.9.0 自动化养号 异常: {str(e)}")
- return exception_status
- # 搜索
- def searchKeyword(d, keyword):
- if d.xpath('//*[@resource-id="com.ss.android.auto:id/c9c"]').exists:
- d.xpath('//*[@resource-id="com.ss.android.auto:id/c9c"]').click()
- else:
- d.xpath('//*[@resource-id="com.ss.android.auto:id/d34"]').click()
- d.send_keys(keyword, clear=True)
- time.sleep(1)
- # 点击搜索
- d.xpath('//*[@resource-id="com.ss.android.auto:id/g9e"]').click(timeout=5)
- time.sleep(2)
- # 随机浏览
- def randomView(d):
- recyclerView = d(resourceId="com.ss.android.auto:id/hgo", className="androidx.recyclerview.widget.RecyclerView")
- if not recyclerView.exists:
- d.swipe_ext("right", 1)
- return
- num = 0
- # 循环随机浏览
- while True:
- if num >= 3:
- break
- num += 1
- layoutG = recyclerView.child(resourceId="com.ss.android.auto:id/dv6", className="android.widget.LinearLayout")
- if not layoutG.exists:
- print("dv_ layout 不存在 则滑动")
- swipe_screen(d)
- continue
- i = random.randint(0, len(layoutG) - 1)
- layoutEle = layoutG[i]
- # 点击对应的控件 可能是帖子 视频 图片等
- layoutEle.click_exists(timeout=1)
- # 进入详情之后之后
- forwardNo = random.randint(1, 5)
- # 随机向下滑动 forwardNo 次
- j = 0
- while j <= forwardNo:
- imageView = d(resourceId="com.ss.android.auto:id/cz")
- if imageView.exists:
- print("点击的图片 不滑动 直接结束")
- time.sleep(1)
- break
- d.swipe_ext(Direction.FORWARD)
- j += 1
- time.sleep(0.25)
- # 随机睡眠 1 ~ 5秒
- time.sleep(random.randint(1, 5))
- # 回到推荐列表
- d.swipe_ext("right", 1)
- time.sleep(1)
- swipe_screen(d)
- time.sleep(0.5)
- # 处理问答
- def handleQuestion(d):
- question = d(resourceId="athena-question-7130163961458065439")
- if question.exists:
- questionViewG = question.child(className="android.view.View")
- if questionViewG.exists:
- d.xpath('//*[@resource-id="athena-question-7130163961458065439"]/android.view.View[3]').click()
- d.xpath('//*[@text="提交"]').click()
- # 进入文章进行处理
- def readPost(d):
- # 判断右上角是否存在三个点
- # 车友圈
- recyclerView = d(resourceId="com.ss.android.auto:id/dv6")
- if recyclerView.exists:
- recyclerView.click()
- else:
- return
- time.sleep(2)
- if d.xpath('//*[@resource-id="com.ss.android.auto:id/iv_more"]').exists:
- print('进入车友圈')
- swipe = 0
- while swipe <= random.randint(5, 8):
- time.sleep(1)
- d.swipe_ext("up", 1)
- swipe += 1
- d.swipe_ext("right", 1)
- # 提问
- elif d.xpath('//*[@resource-id="com.ss.android.auto:id/kad"]').exists:
- print('进入提问')
- swipe = 0
- while swipe <= random.randint(5, 8):
- time.sleep(1)
- d.swipe_ext("up", 1)
- swipe += 1
- d.swipe_ext("right", 1)
- # 长文
- elif d.xpath('//*[@resource-id="com.ss.android.auto:id/ee1"]').exists:
- print('进入长文')
- swipe = 0
- while swipe <= random.randint(5, 8):
- time.sleep(1)
- d.swipe_ext("up", 1)
- swipe += 1
- d.swipe_ext("right", 1)
- elif d.xpath('//*[@resource-id="com.ss.android.auto:id/c5n"]').exists:
- # 短视频
- print('进入短视频')
- random_sleep = random.randint(10, 20)
- time.sleep(random_sleep)
- d.swipe_ext("right", 1)
- else:
- print('进入长视频')
- random_sleep = random.randint(10, 20)
- time.sleep(random_sleep)
- d.swipe_ext("right", 1)
- # 随机滑动屏幕
- def swipe_screen(d):
- # 获取设备的屏幕尺寸
- screen_width = d.info["displayWidth"]
- screen_height = d.info["displayHeight"]
- print("screen_width 屏幕的宽度:", screen_width, "屏幕的长度:", screen_height)
- # 设置滑动的起始点和终止点,可以根据需要进行调整
- start_x = screen_width // 2 # 屏幕宽度的一半
- start_y = random.randint(1000, screen_height) # 起始点纵坐标,可以根据需要调整
- end_x = screen_width // 2
- end_y = random.randint(10, 300)
- # 执行滑动操作
- d.swipe(start_x, start_y, end_x, end_y)
- # d.swipe_ext(Direction.FORWARD)
- # 等待滑动完成,可以根据需要进行适当的等待
- time.sleep(1)
- if __name__ == "__main__":
- simulated_operation("QXNUT21905001550", "飞凡F7", 12121)
|