| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 | # -*- coding: utf-8 -*-"""# description:懂车帝车友圈POC"""import timefrom collections import OrderedDictfrom datetime import datetime, timedeltafrom enum import Enumimport uiautomator2from flask import jsonifyfrom tools import loggerKitclass account_circle_poc(Enum):    R7 = "R7"    F7 = "F7"    def name(self):        return "飞凡R7" if self == account_circle_poc.R7 else "飞凡F7"# 记录当前正在抓取的一级和二级tabcurrent_primary_tab = Nonecurrent_secondary_tab = Nonedef crawl_data(device_id, car_type: account_circle_poc, task_transfer_time, sequence):    data = __get_circle_data(device_id, car_type.name(), task_transfer_time, sequence)    date = datetime.now().strftime('%y-%m-%d')    loggerKit.info("data:{0}, date:{1}", jsonify(data), str(date))def __get_circle_data(device_id, car_name, task_transfer_time, sequence):    """    抓取懂车帝车友圈数据    :param device_id: 执行任务的设备    :param car_name: 车友圈对应的汽车名称    :param task_transfer_time: 数据截止时间    """    d = uiautomator2.connect(device_id)    d.debug = False    d.screen_on()    d.unlock()    d.press("recent")    # 打开懂车帝app    d.app_stop("com.ss.android.auto")    d.app_start("com.ss.android.auto", ".policy.AutoPrivacyActivity")    time.sleep(5)    # app升级提醒弹窗处理    if d(resourceId="com.ss.android.auto:id/dtq").wait(1):        # 忽略升级        d(resourceId="com.ss.android.auto:id/dtq").click()    # 车友圈    d.xpath(        '//*[@resource-id="android:id/tabs"]/android.widget.RelativeLayout[2]/android.widget.LinearLayout[1]').click()    time.sleep(2)    # 搜索飞凡R7兴趣圈    d(resourceId="com.ss.android.auto:id/ipm").click()    d(resourceId="com.ss.android.auto:id/gt8").set_text(car_name)    d(resourceId="com.ss.android.auto:id/fyw").click()    time.sleep(2)    d(text="车友圈").click()    d(text="进入车友圈").click()    #    tabs = [        # 动态        (d.xpath(            '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[1]/android.widget.ImageView[1]'),         "动态", "dongtai"),        # 问答        (d.xpath(            '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[2]/android.widget.ImageView[1]'),         "问答", "answer"),        # 口碑        (d.xpath(            '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[3]/android.widget.ImageView[1]'),         "口碑", "opinions"),    ]    data = dict()    global current_primary_tab, current_secondary_tab    for item in tabs:        current_primary_tab = item[1]        current_secondary_tab = None        item[0].click()        time.sleep(3)        # 动态, 二级tab        if item[1] == "动态":            tmp_dict = dict()            for sub_tab in (("热门", "hot"), ("全部", "all"), ("精华", "essence")):                current_secondary_tab = sub_tab[0]                d(resourceId="com.ss.android.auto:id/jy8", text=sub_tab[0]).click()                time.sleep(3)                if sub_tab[0] == "全部":                    # 按发布时间排序列表                    d(resourceId="com.ss.android.auto:id/jy8", text="全部").click()                    d(resourceId="com.ss.android.auto:id/io3", text="按发布时间排序").click()                # 帖子列表                recycler_view = d(resourceId="com.ss.android.auto:id/hh0",                                  className="androidx.recyclerview.widget.RecyclerView")                time.sleep(2)                # 热门帖子固定抓取10条,根据任务排序上滑指定条数再抓取                tmp_dict[sub_tab[1]] = __parse_dongchedi_circle_list(recycler_view, d, task_transfer_time,                                                                     10 if sub_tab[0] == "热门" else 0, sequence)                time.sleep(1)            #            data[item[2]] = tmp_dict        else:            if item[1] == "问答":                # 按发布时间排序列表                d(resourceId="com.ss.android.auto:id/jy8", text="全部").click()                d(resourceId="com.ss.android.auto:id/io3", text="按发布时间排序").click()            # 帖子列表            recycler_view = d(resourceId="com.ss.android.auto:id/hh0",                              className="androidx.recyclerview.widget.RecyclerView")            time.sleep(2)            data[item[2]] = __parse_dongchedi_circle_list(recycler_view, d, task_transfer_time, 0, sequence)            time.sleep(1)        #        current_primary_tab = None        current_secondary_tab = None    #    d.app_stop("com.ss.android.auto")    return datadef __parse_dongchedi_circle_list(listview, d: uiautomator2.Device, task_transfer_time, get_count, sequence):    if not listview.exists:        return []    #    transfer_date = datetime.strptime(task_transfer_time, '%Y-%m-%d').date()    ret = OrderedDict()    # get_count > 0 表示按固定数量抓取,按任务排序跳过前面的数据    if not isinstance(sequence, int):        sequence = 1    skip_count = (sequence - 1) * get_count    skip_cache = set()    #    while True:        if d.xpath('//*[@text="没有更多了"]').exists:            break        cell = listview.child(resourceId="com.ss.android.auto:id/dvd", className="android.widget.LinearLayout")        if not cell.exists:            break        # 帖子内容        while not d(resourceId="com.ss.android.auto:id/jdr").wait(timeout=1):            __retry(5, lambda: d.swipe_ext("up", 0.2))        #        content_view = d(resourceId="com.ss.android.auto:id/jdr")        content = content_view.get_text()[:20]        # 跳过指定数量的帖子        if get_count > 0 and skip_count > 0:            if content not in skip_cache:                skip_count -= 1                skip_cache.add(content)            #            __retry(5, lambda: d.swipe_ext("up", 0.2))            continue        #        if content in skip_cache:            __retry(5, lambda: d.swipe_ext("up", 0.2))            continue        # 根据帖子内容去重        cache_key = content        if cache_key in ret:            # 内容已抓取            __retry(5, lambda: d.swipe_ext("up", 0.2))            continue        # 发布时间        time_view = __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/ihu"))        if time_view is None:            __retry(5, lambda: d.swipe_ext("up", 0.2))            time.sleep(2)            continue        #        time_str = time_view.get_text().split("最后回复")[0]        time_str = __formate_time(time_str)        #        if get_count <= 0:            # 不是按固定数量抓取时才比对发帖时间            published_date = datetime.strptime(time_str, '%Y-%m-%d').date()            if published_date > transfer_date:                # 当前帖子时间晚于目标时间                if current_primary_tab == "问答":                    # 列表内容可以漏抓,提高列表滑动效率                    if published_date - transfer_date > timedelta(days=2):                        __retry(5, lambda: d.swipe_ext("up", 1))                    elif published_date - transfer_date > timedelta(days=1):                        __retry(5, lambda: d.swipe_ext("up", 0.5))                    else:                        __retry(5, lambda: d.swipe_ext("up", 0.2))                else:                    # 列表内容不能漏抓,不能滑动过快                    __retry(5, lambda: d.swipe_ext("up", 0.2))                #                continue            elif published_date < transfer_date:                # 当前帖子时间早于目标时间                break        # 昵称        nickname = __find_view(d, "up", lambda: content_view.up(resourceId="com.ss.android.auto:id/v")).get_text()        # 转发量        share_count = __intValue(            __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/jte")).get_text())        # 评论数        comment_count = __intValue(            __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/i9j")).get_text())        # 标题(标题有可能没有,所以这里不需要重试)        title = ""        title_view = content_view.up(resourceId="com.ss.android.auto:id/t", className="android.widget.TextView")        if title_view is not None and title_view.get_text() != "车主评分":            title = title_view.get_text()        #        item = {            'nickName': nickname,            'title': content if title == "" else title,            'publishDate': time_str,            'shareNum': share_count,            'commentNum': comment_count,        }        ret[cache_key] = item        print(f"nickname = {nickname}, share_count = {share_count}, comment_count = {comment_count}, "              f"content = {content}, title = {title}, time = {time_str}")        #        if 0 < get_count <= len(ret.values()):            # 按固定数量抓取,已达预期            break        # 列表滚动有时会出错,需要加重试        __retry(5, lambda: d.swipe_ext("up", 0.2))        time.sleep(1)    #    return list(ret.values())def __retry(times, operation):    sleep_time = 1    while times > 0:        try:            operation()            break        except Exception as e:            print("retry exception: \n")            print(e)            times -= 1            time.sleep(sleep_time)            sleep_time += 2def __intValue(value_str):    value = 0    try:        value = int(value_str)    except Exception as ex:        pass    return valuedef __find_view(d: uiautomator2.Device, direction, op):    try:        ret = op()        retry_count = 10        scroll_direction = ("up", "down")        while ret is None and retry_count > 0 and direction in scroll_direction:            retry_count -= 1            __retry(5, lambda: d.swipe_ext("up" if direction == "down" else "down", 0.1))            time.sleep(1)            ret = op()        #        return ret    except:        return Nonedef __formate_time(time_str):    now = datetime.now()    if '刚刚' in time_str:        p_time = now    elif '分钟' in time_str:        minutes = int(time_str[:time_str.index('分')])        p_time = now - timedelta(minutes=minutes)    elif '小时' in time_str:        hours = int(time_str[:time_str.index('小')])        p_time = now - timedelta(hours=hours)    elif '昨天' in time_str:        p_time = now - timedelta(days=1)    elif '前天' in time_str:        p_time = now - timedelta(days=2)    elif '天' in time_str:        days = int(time_str[:time_str.index('天')])        p_time = now - timedelta(days=days)    elif '周' in time_str:        weeks = int(time_str[:time_str.index('周') - 1])        p_time = now - timedelta(weeks=weeks)    elif '年' not in time_str and '月' in time_str and '日' in time_str:        p_time = datetime.strptime(f'{now.year}年{time_str}', "%Y年%m月%d日").date()    else:        time_str = time_str.split()[0]        items = time_str.split("-")        if len(items) == 2:            p_time = datetime.strptime(f'{now.year}年{items[0]}月{items[1]}日', "%Y年%m月%d日").date()        elif len(items) == 3:            p_time = datetime.strptime(f'{items[0]}年{items[1]}月{items[2]}日', "%Y年%m月%d日").date()        else:            return None    #    p_time = p_time.strftime('%Y-%m-%d')    return p_time
 |