| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- # -*- coding: utf-8 -*-
- """
- # description:懂车帝车友圈POC
- """
- import time
- from collections import OrderedDict
- from datetime import datetime, timedelta
- from enum import Enum
- import uiautomator2
- from flask import jsonify
- from tools import loggerKit
- class account_circle_poc(Enum):
- R7 = "R7"
- F7 = "F7"
- def name(self):
- return "飞凡R7" if self == account_circle_poc.R7 else "飞凡F7"
- # 记录当前正在抓取的一级和二级tab
- current_primary_tab = None
- current_secondary_tab = None
- def crawl_data(device_id, car_type: account_circle_poc, task_transfer_time, sequence):
- data = __get_circle_data(device_id, car_type.name(), task_transfer_time, sequence)
- date = datetime.now().strftime('%y-%m-%d')
- loggerKit.info("data:{0}, date:{1}", jsonify(data), str(date))
- def __get_circle_data(device_id, car_name, task_transfer_time, sequence):
- """
- 抓取懂车帝车友圈数据
- :param device_id: 执行任务的设备
- :param car_name: 车友圈对应的汽车名称
- :param task_transfer_time: 数据截止时间
- """
- d = uiautomator2.connect(device_id)
- d.debug = False
- d.screen_on()
- d.unlock()
- d.press("recent")
- # 打开懂车帝app
- d.app_stop("com.ss.android.auto")
- d.app_start("com.ss.android.auto", ".policy.AutoPrivacyActivity")
- time.sleep(5)
- # app升级提醒弹窗处理
- if d(resourceId="com.ss.android.auto:id/dtq").wait(1):
- # 忽略升级
- d(resourceId="com.ss.android.auto:id/dtq").click()
- # 车友圈
- d.xpath(
- '//*[@resource-id="android:id/tabs"]/android.widget.RelativeLayout[2]/android.widget.LinearLayout[1]').click()
- time.sleep(2)
- # 搜索飞凡R7兴趣圈
- d(resourceId="com.ss.android.auto:id/ipm").click()
- d(resourceId="com.ss.android.auto:id/gt8").set_text(car_name)
- d(resourceId="com.ss.android.auto:id/fyw").click()
- time.sleep(2)
- d(text="车友圈").click()
- d(text="进入车友圈").click()
- #
- tabs = [
- # 动态
- (d.xpath(
- '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[1]/android.widget.ImageView[1]'),
- "动态", "dongtai"),
- # 问答
- (d.xpath(
- '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[2]/android.widget.ImageView[1]'),
- "问答", "answer"),
- # 口碑
- (d.xpath(
- '//*[@resource-id="com.ss.android.auto:id/ce8"]/android.widget.LinearLayout[3]/android.widget.ImageView[1]'),
- "口碑", "opinions"),
- ]
- data = dict()
- global current_primary_tab, current_secondary_tab
- for item in tabs:
- current_primary_tab = item[1]
- current_secondary_tab = None
- item[0].click()
- time.sleep(3)
- # 动态, 二级tab
- if item[1] == "动态":
- tmp_dict = dict()
- for sub_tab in (("热门", "hot"), ("全部", "all"), ("精华", "essence")):
- current_secondary_tab = sub_tab[0]
- d(resourceId="com.ss.android.auto:id/jy8", text=sub_tab[0]).click()
- time.sleep(3)
- if sub_tab[0] == "全部":
- # 按发布时间排序列表
- d(resourceId="com.ss.android.auto:id/jy8", text="全部").click()
- d(resourceId="com.ss.android.auto:id/io3", text="按发布时间排序").click()
- # 帖子列表
- recycler_view = d(resourceId="com.ss.android.auto:id/hh0",
- className="androidx.recyclerview.widget.RecyclerView")
- time.sleep(2)
- # 热门帖子固定抓取10条,根据任务排序上滑指定条数再抓取
- tmp_dict[sub_tab[1]] = __parse_dongchedi_circle_list(recycler_view, d, task_transfer_time,
- 10 if sub_tab[0] == "热门" else 0, sequence)
- time.sleep(1)
- #
- data[item[2]] = tmp_dict
- else:
- if item[1] == "问答":
- # 按发布时间排序列表
- d(resourceId="com.ss.android.auto:id/jy8", text="全部").click()
- d(resourceId="com.ss.android.auto:id/io3", text="按发布时间排序").click()
- # 帖子列表
- recycler_view = d(resourceId="com.ss.android.auto:id/hh0",
- className="androidx.recyclerview.widget.RecyclerView")
- time.sleep(2)
- data[item[2]] = __parse_dongchedi_circle_list(recycler_view, d, task_transfer_time, 0, sequence)
- time.sleep(1)
- #
- current_primary_tab = None
- current_secondary_tab = None
- #
- d.app_stop("com.ss.android.auto")
- return data
- def __parse_dongchedi_circle_list(listview, d: uiautomator2.Device, task_transfer_time, get_count, sequence):
- if not listview.exists:
- return []
- #
- transfer_date = datetime.strptime(task_transfer_time, '%Y-%m-%d').date()
- ret = OrderedDict()
- # get_count > 0 表示按固定数量抓取,按任务排序跳过前面的数据
- if not isinstance(sequence, int):
- sequence = 1
- skip_count = (sequence - 1) * get_count
- skip_cache = set()
- #
- while True:
- if d.xpath('//*[@text="没有更多了"]').exists:
- break
- cell = listview.child(resourceId="com.ss.android.auto:id/dvd", className="android.widget.LinearLayout")
- if not cell.exists:
- break
- # 帖子内容
- while not d(resourceId="com.ss.android.auto:id/jdr").wait(timeout=1):
- __retry(5, lambda: d.swipe_ext("up", 0.2))
- #
- content_view = d(resourceId="com.ss.android.auto:id/jdr")
- content = content_view.get_text()[:20]
- # 跳过指定数量的帖子
- if get_count > 0 and skip_count > 0:
- if content not in skip_cache:
- skip_count -= 1
- skip_cache.add(content)
- #
- __retry(5, lambda: d.swipe_ext("up", 0.2))
- continue
- #
- if content in skip_cache:
- __retry(5, lambda: d.swipe_ext("up", 0.2))
- continue
- # 根据帖子内容去重
- cache_key = content
- if cache_key in ret:
- # 内容已抓取
- __retry(5, lambda: d.swipe_ext("up", 0.2))
- continue
- # 发布时间
- time_view = __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/ihu"))
- if time_view is None:
- __retry(5, lambda: d.swipe_ext("up", 0.2))
- time.sleep(2)
- continue
- #
- time_str = time_view.get_text().split("最后回复")[0]
- time_str = __formate_time(time_str)
- #
- if get_count <= 0:
- # 不是按固定数量抓取时才比对发帖时间
- published_date = datetime.strptime(time_str, '%Y-%m-%d').date()
- if published_date > transfer_date:
- # 当前帖子时间晚于目标时间
- if current_primary_tab == "问答":
- # 列表内容可以漏抓,提高列表滑动效率
- if published_date - transfer_date > timedelta(days=2):
- __retry(5, lambda: d.swipe_ext("up", 1))
- elif published_date - transfer_date > timedelta(days=1):
- __retry(5, lambda: d.swipe_ext("up", 0.5))
- else:
- __retry(5, lambda: d.swipe_ext("up", 0.2))
- else:
- # 列表内容不能漏抓,不能滑动过快
- __retry(5, lambda: d.swipe_ext("up", 0.2))
- #
- continue
- elif published_date < transfer_date:
- # 当前帖子时间早于目标时间
- break
- # 昵称
- nickname = __find_view(d, "up", lambda: content_view.up(resourceId="com.ss.android.auto:id/v")).get_text()
- # 转发量
- share_count = __intValue(
- __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/jte")).get_text())
- # 评论数
- comment_count = __intValue(
- __find_view(d, "down", lambda: content_view.down(resourceId="com.ss.android.auto:id/i9j")).get_text())
- # 标题(标题有可能没有,所以这里不需要重试)
- title = ""
- title_view = content_view.up(resourceId="com.ss.android.auto:id/t", className="android.widget.TextView")
- if title_view is not None and title_view.get_text() != "车主评分":
- title = title_view.get_text()
- #
- item = {
- 'nickName': nickname,
- 'title': content if title == "" else title,
- 'publishDate': time_str,
- 'shareNum': share_count,
- 'commentNum': comment_count,
- }
- ret[cache_key] = item
- print(f"nickname = {nickname}, share_count = {share_count}, comment_count = {comment_count}, "
- f"content = {content}, title = {title}, time = {time_str}")
- #
- if 0 < get_count <= len(ret.values()):
- # 按固定数量抓取,已达预期
- break
- # 列表滚动有时会出错,需要加重试
- __retry(5, lambda: d.swipe_ext("up", 0.2))
- time.sleep(1)
- #
- return list(ret.values())
- def __retry(times, operation):
- sleep_time = 1
- while times > 0:
- try:
- operation()
- break
- except Exception as e:
- print("retry exception: \n")
- print(e)
- times -= 1
- time.sleep(sleep_time)
- sleep_time += 2
- def __intValue(value_str):
- value = 0
- try:
- value = int(value_str)
- except Exception as ex:
- pass
- return value
- def __find_view(d: uiautomator2.Device, direction, op):
- try:
- ret = op()
- retry_count = 10
- scroll_direction = ("up", "down")
- while ret is None and retry_count > 0 and direction in scroll_direction:
- retry_count -= 1
- __retry(5, lambda: d.swipe_ext("up" if direction == "down" else "down", 0.1))
- time.sleep(1)
- ret = op()
- #
- return ret
- except:
- return None
- def __formate_time(time_str):
- now = datetime.now()
- if '刚刚' in time_str:
- p_time = now
- elif '分钟' in time_str:
- minutes = int(time_str[:time_str.index('分')])
- p_time = now - timedelta(minutes=minutes)
- elif '小时' in time_str:
- hours = int(time_str[:time_str.index('小')])
- p_time = now - timedelta(hours=hours)
- elif '昨天' in time_str:
- p_time = now - timedelta(days=1)
- elif '前天' in time_str:
- p_time = now - timedelta(days=2)
- elif '天' in time_str:
- days = int(time_str[:time_str.index('天')])
- p_time = now - timedelta(days=days)
- elif '周' in time_str:
- weeks = int(time_str[:time_str.index('周') - 1])
- p_time = now - timedelta(weeks=weeks)
- elif '年' not in time_str and '月' in time_str and '日' in time_str:
- p_time = datetime.strptime(f'{now.year}年{time_str}', "%Y年%m月%d日").date()
- else:
- time_str = time_str.split()[0]
- items = time_str.split("-")
- if len(items) == 2:
- p_time = datetime.strptime(f'{now.year}年{items[0]}月{items[1]}日', "%Y年%m月%d日").date()
- elif len(items) == 3:
- p_time = datetime.strptime(f'{items[0]}年{items[1]}月{items[2]}日', "%Y年%m月%d日").date()
- else:
- return None
- #
- p_time = p_time.strftime('%Y-%m-%d')
- return p_time
|