import random import threading import time import uiautomator2 as u2 from func.check_device import retry, reboot_device from task.task_dict import script_status from tools import loggerKit # 4.1.3(21966) # 编辑群名称 def wework_edit(device_serial, keyword, content): # 连接设备 device = u2.connect(device_serial) device.healthcheck() device.screen_on() device.unlock() device.debug = False # 重启企业微信 device.app_stop("com.tencent.wework") device.app_start("com.tencent.wework") # 打开搜索框 device.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').click() # 输入搜索文案 device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(keyword) # 点击搜索所得的第一个 device.xpath('//*[@resource-id="com.tencent.wework:id/ge2"]').click() # 点击编辑群按钮 device.xpath('//*[@resource-id="com.tencent.wework:id/l6w"]').click() # 点击群名称 device.xpath('//*[@resource-id="com.tencent.wework:id/bze"]').click() # 文案设置 device.xpath('//*[@resource-id="com.tencent.wework:id/egb"]').set_text(content) # 编辑提交 device.xpath('//*[@resource-id="com.tencent.wework:id/l73"]').click() # 创建字典来保存锁对象 lock_dict = {} sleep_time = 0.5 def random_add_subtract(number): operation = random.choice(['add']) if operation == 'add': number += random.choice([0.1, 0.2]) elif operation == 'subtract': number += random.choice([0.1, 0.2]) return number # 消息回复 def wework_reply(device_serial, keyword, content): status = script_status(1, 0, '', '') # 连接设备 device = u2.connect(device_serial) device.healthcheck() device.screen_on() device.unlock() device.debug = False time.sleep(random_add_subtract(sleep_time)) # 打开搜索框 loggerKit.info('thread[{0}=>{1}], 设备号: {2}, 打开搜索框', threading.current_thread().name, threading.get_ident(), device_serial) start_time1 = time.time() if device.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').click() else: return retry(lambda: device.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').click(), device_serial, "搜索按钮元素未获取到") end_time1 = time.time() loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 打开搜索框==>>执行时长:{3}", threading.current_thread().name, threading.get_ident(), device_serial, end_time1 - start_time1) time.sleep(random_add_subtract(sleep_time)) # 输入搜索文案 loggerKit.info('thread[{0}=>{1}], 输入搜索文案:{2}, 设备号:{3}', threading.current_thread().name, threading.get_ident(), keyword, device_serial) start_time2 = time.time() if device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(keyword) else: return retry(lambda: device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(keyword), device_serial, "搜索框元素未获取到") end_time2 = time.time() loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 输入搜索文案==>>执行时长:{3}", threading.current_thread().name, threading.get_ident(), device_serial, end_time2 - start_time2) time.sleep(random_add_subtract(sleep_time)) loggerKit.info('thread[{0}=>{1}], 点击搜索所得的第一个', threading.current_thread().name, threading.get_ident()) start_time3 = time.time() time.sleep(1) if device.xpath('//*[@resource-id="com.tencent.wework:id/kzx"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/ge2"]').click() else: return retry(lambda: device.xpath('//*[@resource-id="com.tencent.wework:id/ge2"]').click(), device_serial, "搜索不到群聊") end_time3 = time.time() loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 点击搜索所得的第一个==>>执行时长:{3}", threading.current_thread().name, threading.get_ident(), device_serial, end_time3 - start_time3) time.sleep(random_add_subtract(sleep_time)) # 编辑框 loggerKit.info('thread[{0}=>{1}], 设备号:{2}, 输入编辑框文本', threading.current_thread().name, threading.get_ident(), device_serial) start_time4 = time.time() if device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').click() else: return retry(lambda: device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').click(), device_serial, "编辑框元素未获取到") end_time4 = time.time() loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 编辑框==>>执行时长:{3}", threading.current_thread().name, threading.get_ident(), device_serial, end_time4 - start_time4) time.sleep(random_add_subtract(sleep_time)) # 内容赋值 text = content.replace("
", "\n") # 直接输出替换后的字符串 loggerKit.info('替换后的字符串:{0}', text) # 内容赋值 device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').set_text(text) # 发送 loggerKit.info('thread[{0}=>{1}], 设备号:{2}, 发送文案', threading.current_thread().name, threading.get_ident(), device_serial) start_time5 = time.time() if device.xpath('//*[@resource-id="com.tencent.wework:id/gsf"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/gsf"]').click() else: return retry(lambda: device.xpath('//*[@resource-id="com.tencent.wework:id/gsf"]').click(), device_serial, "发送按钮元素未获取到") end_time5 = time.time() loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 发送==>>执行时长:{3}", threading.current_thread().name, threading.get_ident(), device_serial, end_time5 - start_time5) time.sleep(random_add_subtract(sleep_time)) # 退出会话 loggerKit.info('thread[{0}=>{1}], 设备号:{2}, 退出会话', threading.current_thread().name, threading.get_ident(), device_serial) time.sleep(random_add_subtract(sleep_time)) start_time6 = time.time() device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() end_time6 = time.time() loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 退出会话==>>执行时长:{3}", threading.current_thread().name, threading.get_ident(), device_serial, end_time6 - start_time6) # 退出搜索 loggerKit.info('thread[{0}=>{1}], 设备号:{2}, 退出搜索', threading.current_thread().name, threading.get_ident(), device_serial) time.sleep(random_add_subtract(sleep_time)) start_time7 = time.time() device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() end_time7 = time.time() loggerKit.info("thread[{0}=>{1}], 设备号:{2}, 退出搜索==>>执行时长:{3}", threading.current_thread().name, threading.get_ident(), device_serial, end_time7 - start_time7) return status # 指定人消息回复 def wework_appoint_reply(device_serial, keyword, username, content): # 连接设备 device = u2.connect(device_serial) status = script_status(1, 0, '', '') # 打开搜索框 loggerKit.info('thread[{0}=>{1}], 设备号: {2}, 打开搜索框', threading.current_thread().name, threading.get_ident(), device_serial) if device.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').click() else: status = script_status(0, 500, '搜索按钮元素未获取到', '') reboot_device(device_serial, '搜索按钮元素未获取到') return status time.sleep(random_add_subtract(sleep_time)) # 输入搜索文案 loggerKit.info('thread[{0}=>{1}], 输入搜索文案:{2}, 设备号:{3}', threading.current_thread().name, threading.get_ident(), keyword, device_serial) if device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(keyword) else: status = script_status(0, 500, '搜索框元素未获取到', '') reboot_device(device_serial, '搜索框元素未获取到') return status time.sleep(1) # 点击搜索所得的第一个 if device.xpath('//*[@resource-id="com.tencent.wework:id/kzx"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/ge2"]').click() else: status = script_status(0, 500, '搜索不到群聊', '') reboot_device(device_serial, '搜索不到群聊') return status time.sleep(random_add_subtract(sleep_time)) # 编辑框 loggerKit.info('thread[{0}=>{1}], 设备号:{2}, 输入编辑框文本', threading.current_thread().name, threading.get_ident(), device_serial) if device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').click() # 内容赋值 edit = device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]') edit.set_text("@") time.sleep(random_add_subtract(sleep_time)) if device.xpath('//*[@resource-id="com.tencent.wework:id/dae"]').exists: time.sleep(random_add_subtract(sleep_time)) # 搜索框赋值 @出指定人 device.xpath('//*[@resource-id="com.tencent.wework:id/dae"]').set_text(username) # 因网络原因 部分随机等待时间可能会对程序有影响 time.sleep(random_add_subtract(sleep_time)) # 获取到搜索出的第一个人 if device.xpath('//*[@resource-id="com.tencent.wework:id/itp"]/android.view.ViewGroup[1]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/itp"]/android.view.ViewGroup[1]').click() else: status = script_status(0, 500, '@不到指定人', '') reboot_device(device_serial, '@不到指定人') return status # 内容赋值 # 使用 replace() 方法将字符串中的
替换成 \n text = content.replace("
", "\n") edit.set_text(text + "\n" + edit.text) else: status = script_status(0, 500, '@指定人元素未获取到', '') reboot_device(device_serial, '@指定人元素未获取到') return status else: status = script_status(0, 500, '编辑框元素未获取到', '') reboot_device(device_serial, '编辑框元素未获取到') return status # 发送 loggerKit.info('thread[{0}=>{1}], 设备号:{2}, 发送文案', threading.current_thread().name, threading.get_ident(), device_serial) if device.xpath('//*[@resource-id="com.tencent.wework:id/gsf"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/gsf"]').click() else: status = script_status(0, 500, '发送按钮元素未获取到', '') reboot_device(device_serial, '发送按钮元素未获取到') return status # 退出会话 device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() # 退出搜索 if device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() return status # 企微群消息回复+h5 转发操作 def wework_reply_h5(device_serial, keyword, content, material_id): status = wework_reply(device_serial, keyword, content) if status.execute_status == 0: return status status = wework_h5(device_serial, keyword, material_id) return status # 企微指定人群消息回复 +h5 转发操作 def wework_appoint_reply_h5(device_serial, keyword, username, content, material_id): status = wework_appoint_reply(device_serial, keyword, username, content) if status.execute_status == 0: return status status = wework_h5(device_serial, keyword, material_id) return status # 企微群消息回复+h5 转发操作 def wework_reply_mini(device_serial, keyword, content, material_id): status = wework_reply(device_serial, keyword, content) if status.execute_status == 0: return status status = wework_mini(device_serial, keyword, material_id) return status # 企微指定人群消息回复 +h5 转发操作 def wework_appoint_reply_mini(device_serial, keyword, username, content, material_id): status = wework_appoint_reply(device_serial, keyword, username, content) if status.execute_status == 0: return status status = wework_mini(device_serial, keyword, material_id) return status # 企微群转发h5 操作 def wework_h5(device_serial, keyword, material_id): status = script_status(1, 0, '', '') # 设备连接 d = u2.connect(device_serial) # 搜索 d.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').click_exists() # 关键词 d.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').click_exists() d(focused=True).set_text("qa-机器人素材") # 点击 d.xpath( '//*[@resource-id="com.tencent.wework:id/jh1"]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[' '1]/android.widget.RelativeLayout[1]').click_exists() # 加载等待 time.sleep(5) # 点解编辑按钮 d.click(0.402, 0.18) # 设置url d.send_keys(material_id) # 点击h5跳转 d.click(0.563, 0.262) # 进行转发操作 d.xpath('//*[@resource-id="com.tencent.wework:id/l6t"]').click() d.xpath('//*[@text="转发"]').click() # 搜索转发人 d.xpath('//*[@resource-id="com.tencent.wework:id/l71"]').click() # 搜索框赋值 d.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(keyword) time.sleep(1) # 点击第一个联系人 d.xpath( '//*[@resource-id="com.tencent.wework:id/c6f"]/android.widget.RelativeLayout[2]').click() # 点击发送按钮 d.xpath('//*[@resource-id="com.tencent.wework:id/ck0"]').click() # 退出 d.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() # 返回首页 d.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() return status # 企微群转发h5 操作 def wework_mini(device_serial, keyword, material_id): status = script_status(1, 0, '', '') # 设备连接 d = u2.connect(device_serial) # 搜索 d.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').click_exists() # 关键词 d.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').click_exists() d(focused=True).set_text("qa-机器人素材") # 点击 d.xpath( '//*[@resource-id="com.tencent.wework:id/jh1"]/android.widget.RelativeLayout[' '2]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[' '1]/android.widget.RelativeLayout[1]').click_exists() # 加载等待 time.sleep(5) # 点解编辑按钮 d.click(0.402, 0.18) # 设置url d.send_keys(material_id) # 点击小程序跳转 d.click(0.563, 0.334) # 进行转发操作 d.xpath('//*[@content-desc="更多"]').click() time.sleep(2) # 进入小程序前可能会有加载时间 if d.xpath('//*[@text="转发"]').exists: d.xpath('//*[@text="转发"]').click() else: status = script_status(0, 500, '无法打开小程序' + material_id, '') return status # 搜索转发人 d.xpath('//*[@resource-id="com.tencent.wework:id/l71"]').click() # 搜索框赋值 d.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(keyword) time.sleep(1) # 点击第一个联系人 d.xpath( '//*[@resource-id="com.tencent.wework:id/c6f"]/android.widget.RelativeLayout[2]').click() # 点击发送按钮 d.xpath('//*[@resource-id="com.tencent.wework:id/ck0"]').click() # 退出 d.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() # 返回首页 d.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() return status # 训练营 def wework_camp(device_serial, keyword): # 连接设备 device = u2.connect(device_serial) device.healthcheck() device.screen_on() device.unlock() device.debug = False # 重启企业微信 device.app_stop("com.tencent.wework") device.app_start("com.tencent.wework") # 打开搜索框 device.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').click() # 输入搜索文案 device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(keyword) # 点击搜索所得的第一个 device.xpath('//*[@resource-id="com.tencent.wework:id/ge2"]').click() while 1: # 如果有训练营按钮 点击进入 if device.xpath('//*[@text="成长营"]').exists: device.xpath('//*[@text="成长营"]').click_exists(10) # 缓冲五秒 time.sleep(5) # h5页面无法获取到对应的xpath元素 进行坐标点击 device.click(0.168, 0.152) time.sleep(2) # 发送按钮点击 device.xpath('//*[@resource-id="com.tencent.wework:id/ck0"]').click() break # 如果有添加xpath 进入添加工具栏页面 if device.xpath('//*[@resource-id="com.tencent.wework:id/ham"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/ham"]').click() break # 向右滑动继续获取 parent_view = device.xpath('//*[@resource-id="com.tencent.wework:id/fbf"]') parent_view.swipe("left") def invite_demo(device_serial, key_word, user_name, content): # 编辑群页面最多展示的群人数 max_count = 20 add_people_xpath = '//*[@resource-id="com.tencent.wework:id/edb"]/android.widget.LinearLayout[{}]/android.widget.ImageView[1]' # 设备连接 device = u2.connect(device_serial) # 重启企业微信 device.app_stop("com.tencent.wework") device.app_start("com.tencent.wework") # 打开搜索框 device.xpath('//*[@resource-id="com.tencent.wework:id/l76"]').click() # 输入搜索文案 device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(key_word) # 点击搜索所得的第一个 device.xpath('//*[@resource-id="com.tencent.wework:id/ge2"]').click() # 点击编辑群按钮 device.xpath('//*[@resource-id="com.tencent.wework:id/l6w"]').click() # 获取群人数 people_count = device.xpath('//*[@resource-id="com.tencent.wework:id/eb_"]').get_text() # 获取到群人数 people_count = int(people_count.replace("人", "")) if people_count >= max_count: people_count = max_count else: people_count = people_count + 1 add_people_xpath = add_people_xpath.format(people_count) # 进入选择联系人页面 device.xpath(add_people_xpath).click() # 选择企业通讯录 device.xpath('//*[@text="企业通讯录"]').click() # 点击搜索按钮 device.xpath( '//*[@resource-id="com.tencent.wework:id/jlj"]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.LinearLayout[2]/android.widget.RelativeLayout[1]/android.widget.TextView[1]').click() # 搜索需添加人员 device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(user_name) # 选中第一个匹配人员 device.xpath( '//*[@resource-id="com.tencent.wework:id/jlj"]/android.widget.RelativeLayout[1]/android.widget.ListView[1]/android.widget.RelativeLayout[2]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[2]').click() # 确定添加人员 if device.xpath('//*[@resource-id="com.tencent.wework:id/jlf"]').exists: # 如果有确认按钮 确保该群中之前没有该用户 device.xpath('//*[@resource-id="com.tencent.wework:id/jlf"]').click() device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() else: # 如果没有确认按钮 确保该群众已经有了该用户 device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() # 编辑框 device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').click() # 内容赋值 device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').set_text(content) # 发送 device.xpath('//*[@resource-id="com.tencent.wework:id/gsf"]').click() # 点击编辑群按钮 device.xpath('//*[@resource-id="com.tencent.wework:id/l6w"]').click() # 退出群聊 while 1: if device.xpath('//*[@resource-id="com.tencent.wework:id/jtt"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/jtt"]').click() break else: device.swipe_ext('up', scale=1) # 确定退出群聊 device.xpath('//*[@resource-id="com.tencent.wework:id/b42"]').click() def create_group(device_serial, robot_name, customer_name, group_name, content): # 设备连接 device = u2.connect(device_serial) # 打开添加按钮 device.xpath('//*[@resource-id="com.tencent.wework:id/l6w"]').click() # 点击发起群聊按钮 device.xpath('//*[@text="发起群聊"]').click() # 点击搜索按钮 开始搜索人员 device.xpath('//*[@resource-id="com.tencent.wework:id/l7b"]').click() # 输入机器人姓名 device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(robot_name) time.sleep(1) if device.xpath( '//*[@resource-id="com.tencent.wework:id/itp"]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.ImageView[1]').exists: device.xpath( '//*[@resource-id="com.tencent.wework:id/itp"]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.RelativeLayout[1]/android.widget.FrameLayout[1]/android.widget.ImageView[1]').click() # 重置搜索框 device.xpath('//*[@resource-id="com.tencent.wework:id/jfh"]').click() # 输入机器人姓名 device.xpath('//*[@resource-id="com.tencent.wework:id/jfi"]').set_text(customer_name) time.sleep(1) # 输入客户名称 if device.xpath('//*[@resource-id="com.tencent.wework:id/jk0"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/jk0"]').click() # 点击确认按钮 device.xpath('//*[@resource-id="com.tencent.wework:id/jjx"]').click() # 点击编辑群信息 device.xpath('//*[@resource-id="com.tencent.wework:id/l6w"]').click() # 点击群聊名称 device.xpath('//*[@text="群聊名称"]').click() # 设置群名称 device.xpath('//*[@resource-id="com.tencent.wework:id/egb"]').set_text(group_name) # 确定修改 device.xpath('//*[@resource-id="com.tencent.wework:id/l73"]').click() # 退出群编辑 device.xpath('//*[@resource-id="com.tencent.wework:id/l67"]').click() # 编辑框 device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').click() # 内容赋值 device.xpath('//*[@resource-id="com.tencent.wework:id/gsj"]').set_text(content) # 发送 device.xpath('//*[@resource-id="com.tencent.wework:id/gsf"]').click() # 点击编辑群按钮 device.xpath('//*[@resource-id="com.tencent.wework:id/l6w"]').click() # 退出群聊 while 1: if device.xpath('//*[@resource-id="com.tencent.wework:id/jtt"]').exists: device.xpath('//*[@resource-id="com.tencent.wework:id/jtt"]').click() break else: device.swipe_ext('up', scale=1) # 确定退出群聊 device.xpath('//*[@resource-id="com.tencent.wework:id/b42"]').click()