# -*- coding: utf-8 -*- """ # description:懂车帝车友圈POC """ import time from collections import OrderedDict from datetime import datetime, timedelta from enum import Enum import uiautomator2 from flask import jsonify from tools import loggerKit class account_circle_poc(Enum): R7 = "R7" F7 = "F7" def name(self): return "飞凡R7" if self == account_circle_poc.R7 else "飞凡F7" # 记录当前正在抓取的一级和二级tab current_primary_tab = None current_secondary_tab = None def crawl_data(device_id, car_type: account_circle_poc, task_transfer_time, sequence): data = __get_circle_data(device_id, car_type.name(), task_transfer_time, sequence) date = datetime.now().strftime('%y-%m-%d') loggerKit.info("data:{0}, date:{1}", jsonify(data), str(date)) def __get_circle_data(device_id, car_name, task_transfer_time, sequence): """ 抓取懂车帝车友圈数据 :param device_id: 执行任务的设备 :param car_name: 车友圈对应的汽车名称 :param task_transfer_time: 数据截止时间 """ d = uiautomator2.connect(device_id) d.debug = False d.screen_on() d.unlock() d.press("recent") # 打开懂车帝app d.app_stop("com.ss.android.auto") d.app_start("com.ss.android.auto", ".policy.AutoPrivacyActivity") time.sleep(5) # app升级提醒弹窗处理 if d(resourceId="com.ss.android.auto:id/dtq").wait(1): # 忽略升级 d(resourceId="com.ss.android.auto:id/dtq").click() # 车友圈 d.xpath( '//*[@resource-id="android:id/tabs"]/android.widget.RelativeLayout[2]/android.widget.LinearLayout[1]').click() time.sleep(2) # 搜索飞凡R7兴趣圈 d(resourceId="com.ss.android.auto:id/ipm").click() d(resourceId="com.ss.android.auto:id/gt8").set_text(car_name) d(resourceId="com.ss.android.auto:id/fyw").click() time.sleep(2) d(text="车友圈").click() d(text="进入车友圈").click() # tabs = [ # 动态 (d.xpath( '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[1]/android.widget.ImageView[1]'), "动态", "dongtai"), # 问答 (d.xpath( '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[2]/android.widget.ImageView[1]'), "问答", "answer"), # 口碑 (d.xpath( '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[3]/android.widget.ImageView[1]'), "口碑", "opinions"), ] data = dict() global current_primary_tab, current_secondary_tab for item in tabs: current_primary_tab = item[1] current_secondary_tab = None item[0].click() time.sleep(3) # 动态, 二级tab if item[1] == "动态": tmp_dict = dict() for sub_tab in (("热门", "hot"), ("全部", "all"), ("精华", "essence")): current_secondary_tab = sub_tab[0] d(resourceId="com.ss.android.auto:id/jy8", text=sub_tab[0]).click() time.sleep(3) if sub_tab[0] == "全部": # 按发布时间排序列表 d(resourceId="com.ss.android.auto:id/jy8", text="全部").click() d(resourceId="com.ss.android.auto:id/io3", text="按发布时间排序").click() # 帖子列表 recycler_view = d(resourceId="com.ss.android.auto:id/hh0", className="androidx.recyclerview.widget.RecyclerView") time.sleep(2) # 热门帖子固定抓取10条,根据任务排序上滑指定条数再抓取 tmp_dict[sub_tab[1]] = __parse_dongchedi_circle_list(recycler_view, d, task_transfer_time, 10 if sub_tab[0] == "热门" else 0, sequence) time.sleep(1) # data[item[2]] = tmp_dict else: if item[1] == "问答": # 按发布时间排序列表 d(resourceId="com.ss.android.auto:id/jy8", text="全部").click() d(resourceId="com.ss.android.auto:id/io3", text="按发布时间排序").click() # 帖子列表 recycler_view = d(resourceId="com.ss.android.auto:id/hh0", className="androidx.recyclerview.widget.RecyclerView") time.sleep(2) data[item[2]] = __parse_dongchedi_circle_list(recycler_view, d, task_transfer_time, 0, sequence) time.sleep(1) # current_primary_tab = None current_secondary_tab = None # d.app_stop("com.ss.android.auto") return data def __parse_dongchedi_circle_list(listview, d: uiautomator2.Device, task_transfer_time, get_count, sequence): if not listview.exists: return [] # transfer_date = datetime.strptime(task_transfer_time, '%Y-%m-%d').date() ret = OrderedDict() # get_count > 0 表示按固定数量抓取,按任务排序跳过前面的数据 if not isinstance(sequence, int): sequence = 1 skip_count = (sequence - 1) * get_count skip_cache = set() # while True: if d.xpath('//*[@text="没有更多了"]').exists: break cell = listview.child(resourceId="com.ss.android.auto:id/dvd", className="android.widget.LinearLayout") if not cell.exists: break # 帖子内容 while not d(resourceId="com.ss.android.auto:id/jdr").wait(timeout=1): __retry(5, lambda: d.swipe_ext("up", 0.2)) # content_view = d(resourceId="com.ss.android.auto:id/jdr") content = content_view.get_text()[:20] # 跳过指定数量的帖子 if get_count > 0 and skip_count > 0: if content not in skip_cache: skip_count -= 1 skip_cache.add(content) # __retry(5, lambda: d.swipe_ext("up", 0.2)) continue # if content in skip_cache: __retry(5, lambda: d.swipe_ext("up", 0.2)) continue # 根据帖子内容去重 cache_key = content if cache_key in ret: # 内容已抓取 __retry(5, lambda: d.swipe_ext("up", 0.2)) continue # 发布时间 time_view = __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/ihu")) if time_view is None: __retry(5, lambda: d.swipe_ext("up", 0.2)) time.sleep(2) continue # time_str = time_view.get_text().split("最后回复")[0] time_str = __formate_time(time_str) # if get_count <= 0: # 不是按固定数量抓取时才比对发帖时间 published_date = datetime.strptime(time_str, '%Y-%m-%d').date() if published_date > transfer_date: # 当前帖子时间晚于目标时间 if current_primary_tab == "问答": # 列表内容可以漏抓,提高列表滑动效率 if published_date - transfer_date > timedelta(days=2): __retry(5, lambda: d.swipe_ext("up", 1)) elif published_date - transfer_date > timedelta(days=1): __retry(5, lambda: d.swipe_ext("up", 0.5)) else: __retry(5, lambda: d.swipe_ext("up", 0.2)) else: # 列表内容不能漏抓,不能滑动过快 __retry(5, lambda: d.swipe_ext("up", 0.2)) # continue elif published_date < transfer_date: # 当前帖子时间早于目标时间 break # 昵称 nickname = __find_view(d, "up", lambda: content_view.up(resourceId="com.ss.android.auto:id/v")).get_text() # 转发量 share_count = __intValue( __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/jte")).get_text()) # 评论数 comment_count = __intValue( __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/i9j")).get_text()) # 标题(标题有可能没有,所以这里不需要重试) title = "" title_view = content_view.up(resourceId="com.ss.android.auto:id/t", className="android.widget.TextView") if title_view is not None and title_view.get_text() != "车主评分": title = title_view.get_text() # item = { 'nickName': nickname, 'title': content if title == "" else title, 'publishDate': time_str, 'shareNum': share_count, 'commentNum': comment_count, } ret[cache_key] = item print(f"nickname = {nickname}, share_count = {share_count}, comment_count = {comment_count}, " f"content = {content}, title = {title}, time = {time_str}") # if 0 < get_count <= len(ret.values()): # 按固定数量抓取,已达预期 break # 列表滚动有时会出错,需要加重试 __retry(5, lambda: d.swipe_ext("up", 0.2)) time.sleep(1) # return list(ret.values()) def __retry(times, operation): sleep_time = 1 while times > 0: try: operation() break except Exception as e: print("retry exception: \n") print(e) times -= 1 time.sleep(sleep_time) sleep_time += 2 def __intValue(value_str): value = 0 try: value = int(value_str) except Exception as ex: pass return value def __find_view(d: uiautomator2.Device, direction, op): try: ret = op() retry_count = 10 scroll_direction = ("up", "down") while ret is None and retry_count > 0 and direction in scroll_direction: retry_count -= 1 __retry(5, lambda: d.swipe_ext("up" if direction == "down" else "down", 0.1)) time.sleep(1) ret = op() # return ret except: return None def __formate_time(time_str): now = datetime.now() if '刚刚' in time_str: p_time = now elif '分钟' in time_str: minutes = int(time_str[:time_str.index('分')]) p_time = now - timedelta(minutes=minutes) elif '小时' in time_str: hours = int(time_str[:time_str.index('小')]) p_time = now - timedelta(hours=hours) elif '昨天' in time_str: p_time = now - timedelta(days=1) elif '前天' in time_str: p_time = now - timedelta(days=2) elif '天' in time_str: days = int(time_str[:time_str.index('天')]) p_time = now - timedelta(days=days) elif '周' in time_str: weeks = int(time_str[:time_str.index('周') - 1]) p_time = now - timedelta(weeks=weeks) elif '年' not in time_str and '月' in time_str and '日' in time_str: p_time = datetime.strptime(f'{now.year}年{time_str}', "%Y年%m月%d日").date() else: time_str = time_str.split()[0] items = time_str.split("-") if len(items) == 2: p_time = datetime.strptime(f'{now.year}年{items[0]}月{items[1]}日', "%Y年%m月%d日").date() elif len(items) == 3: p_time = datetime.strptime(f'{items[0]}年{items[1]}月{items[2]}日', "%Y年%m月%d日").date() else: return None # p_time = p_time.strftime('%Y-%m-%d') return p_time