douyin_interaction_urgent.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. import time
  2. from urllib.parse import quote
  3. from func.action_func import del_key_vague
  4. from task.task_job import callback_task
  5. from tools import loggerKit, redis_client
  6. from scene.oprator.atom_data import start_app, stop_app, get_text_by_control, single_click_by_control, \
  7. send_text_by_control
  8. import threading
  9. import requests
  10. import yaml
  11. import json
  12. import httpx
  13. with open('config.yaml', 'r') as file:
  14. config = yaml.load(file, Loader=yaml.FullLoader)
  15. extern_domain = config['bmp-cp']['extern_domain']
  16. # 任务执行回调url
  17. task_callback_url = extern_domain + config['bmp-cp']['task_callback_url']
  18. # 调用AIGC接口
  19. post_ai_gc_url = extern_domain + config['bmp-content-center']['comment_local_url']
  20. get_commented_list_url = extern_domain + config['bmp-cp']['get_commented_list_url']
  21. # 紧急公关 抖音互动 (根据视频链接 进行搜索获取到对应的视频进行评论)
  22. # version 29.5.0
  23. def douyin_spider(task_id, keyword, data):
  24. loggerKit.info('请求信息:{0}'.format(data))
  25. device_id = data.get("deviceID")
  26. perform_action_id = data.get("performActionId")
  27. result = data.get("result")
  28. extension_info = data['data']['extendInfo']
  29. extension_info = json.loads(extension_info)
  30. keyword = extension_info['url']
  31. summary = extension_info['summary']
  32. if summary is not None and summary != '':
  33. sub_resource_name = summary
  34. else:
  35. sub_resource_name = extension_info['title']
  36. if result is not None:
  37. """
  38. 非首个指令
  39. """
  40. perform_action_result = result.get("performActionResult")
  41. if perform_action_result is None:
  42. return_dict = {
  43. "data": "",
  44. "code": -2,
  45. "message": "fail, performActionResult is null"
  46. }
  47. # 回调任务中心
  48. del_key_vague(device_id)
  49. callback_task(500, '指令执行失败', task_id, device_id, 0, None, None)
  50. return return_dict
  51. if perform_action_result == "failure":
  52. # 回调任务中心
  53. return_dict = {
  54. "data": "",
  55. "code": -2,
  56. "message": "fail, performActionResult is not success"
  57. }
  58. del_key_vague(device_id)
  59. callback_task(500, '任务执行失败,可能当前无法再次执行看广告任务', task_id, device_id, 0, None, None)
  60. return return_dict
  61. # 元素未找到
  62. if perform_action_result == "invalid operation":
  63. # 回调任务中心
  64. return_dict = {
  65. "data": "",
  66. "code": -2,
  67. "message": "fail, performActionResult is null"
  68. }
  69. del_key_vague(device_id)
  70. callback_task(500, '指令执行失败', task_id, device_id, 0, None)
  71. return return_dict
  72. if perform_action_result == "stop":
  73. # 回调任务中心
  74. return_dict = {
  75. "data": "",
  76. "code": -2,
  77. "message": "fail, performActionResult is not success"
  78. }
  79. del_key_vague(device_id)
  80. callback_task(500, '指令被用户终止', task_id, device_id, 0, None, None)
  81. return return_dict
  82. # 每次操作完成后会将对应的操作唯一id存储到redis,并且返回给手机端 手机端下次带着上个操作id来执行下一个操作
  83. last_action_id = redis_client.get(device_id + "operate")
  84. step0 = redis_client.get(f"{device_id}_step0")
  85. if step0 is not None and int(step0) == 1 and last_action_id is not None and int(perform_action_id) == int(
  86. last_action_id) and perform_action_result == "success":
  87. action1_id = int(round(time.time() * 1000))
  88. """
  89. 发送第1条指令
  90. 点击搜索框
  91. """
  92. action1_dict = single_click_by_control(action1_id, target_app="douyin", target_version="28.8.0",
  93. package_name="com.ss.android.ugc.aweme.lite",
  94. control_id="com.ss.android.ugc.aweme.lite:id/fsf", timeout=5)
  95. redis_client.set(device_id + "operate", action1_id)
  96. redis_client.set(f"{device_id}_step1", "1")
  97. redis_client.delete(f"{device_id}_step0")
  98. loggerKit.info("taskId:{0}, action1_id:{1}", task_id, action1_id)
  99. return action1_dict
  100. step1 = redis_client.get(f"{device_id}_step1")
  101. if step1 is not None and int(step1) == 1 and last_action_id is not None and int(perform_action_id) == int(
  102. last_action_id) and perform_action_result == "success":
  103. loggerKit.info("设备:{0}, action2_id_mem:{1}", device_id, int(last_action_id))
  104. action2_id = int(round(time.time() * 1000))
  105. """
  106. 发送第2条指令
  107. 对搜索框进行赋值
  108. """
  109. action2_dict = send_text_by_control(action2_id, target_app="douyin", target_version="28.8.0",
  110. package_name="com.ss.android.ugc.aweme.lite",
  111. control_id="com.ss.android.ugc.aweme.lite:id/et_search_kw",
  112. content=keyword, timeout=5)
  113. redis_client.set(device_id + "operate", action2_id)
  114. redis_client.set(f"{device_id}_step2", "1")
  115. redis_client.delete(f"{device_id}_step1")
  116. loggerKit.info("taskId:{0}, action2_id:{1}", task_id, action2_id)
  117. return action2_dict
  118. step2 = redis_client.get(f"{device_id}_step2")
  119. if step2 is not None and int(step2) == 1 and last_action_id is not None and int(perform_action_id) == int(
  120. last_action_id) and perform_action_result == "success":
  121. loggerKit.info("设备:{0}, action3_id_mem:{1}", device_id, int(last_action_id))
  122. action3_id = int(round(time.time() * 1000))
  123. """
  124. 发送第3条指令
  125. 点击搜索按钮
  126. """
  127. action3_dict = single_click_by_control(action3_id, target_app="douyin", target_version="28.8.0",
  128. package_name="com.ss.android.ugc.aweme.lite",
  129. control_id="com.ss.android.ugc.aweme.lite:id/nou")
  130. redis_client.set(device_id + "operate", action3_id)
  131. redis_client.set(f"{device_id}_step3", "1")
  132. redis_client.delete(f"{device_id}_step2")
  133. loggerKit.info("taskId:{0}, action3_id:{1}", task_id, action3_id)
  134. return action3_dict
  135. step3 = redis_client.get(f"{device_id}_step3")
  136. if step3 is not None and int(step3) == 1 and last_action_id is not None and int(perform_action_id) == int(
  137. last_action_id) \
  138. and perform_action_result == "success":
  139. loggerKit.info("taskId:{0}, action4_id:{1}", task_id, int(last_action_id))
  140. action4_id = int(round(time.time() * 1000))
  141. """
  142. 发送第4条指令
  143. 根据视频内容 生成评论文案将文案缓存 并且点击评论
  144. 如果没获取到对应的评论 任务结束 关闭
  145. """
  146. if sub_resource_name is None or sub_resource_name == '':
  147. action4_dict = stop_app(action4_id, target_app="douyin", target_version="28.8.0",
  148. package_name="com.ss.android.ugc.aweme.lite", timeout=5)
  149. # 回调任务中心
  150. del_key_vague(device_id)
  151. callback_task(None, '获取评论文案的标题为null,所以未进行评论', task_id, device_id, 1, None, None)
  152. loggerKit.info("taskId:{0}, action4_id_mem:{1},对应的标题为null无法获取评论文案", task_id,
  153. int(last_action_id))
  154. return action4_dict
  155. comment_response = httpx.get(get_commented_list_url + '?resourceName=' + keyword, timeout=120)
  156. # 评论实体
  157. comment_body = json.loads(comment_response.text)
  158. # 评论集合
  159. comment_list = comment_body.get('data')
  160. loggerKit.info("任务id:{0}对应标题:{1},已有评论:{2}", task_id, keyword, comment_list)
  161. existing_comment = ''
  162. if comment_list is not None:
  163. comment_list = list(set(comment_list))
  164. existing_comment = '||'.join(comment_list)
  165. loggerKit.info("任务id:{0}对应标题:{1},拼接后的评论集合:{2}", task_id, keyword, existing_comment)
  166. existing_comment = quote(existing_comment)
  167. request_data = {
  168. "platform": "douyin",
  169. "content": sub_resource_name,
  170. "comment": existing_comment,
  171. "mode": "1"
  172. }
  173. loggerKit.info("任务id:{0}对应标题:{1},请求AIGC信息:{2}", task_id, keyword, request_data)
  174. response = httpx.post(post_ai_gc_url, json=request_data, timeout=120)
  175. loggerKit.info("任务id:{0}对应标题:{1},AIGC返回信息:{2},接口:{3}", task_id, keyword, response,post_ai_gc_url)
  176. # 点击评论框
  177. action4_dict = single_click_by_control(action4_id, target_app="douyin", target_version="28.8.0",
  178. package_name="com.ss.android.ugc.aweme.lite",
  179. control_id="com.ss.android.ugc.aweme.lite:id/comment_container",
  180. timeout=5)
  181. if not response.is_success:
  182. # 调用AIGC获取评论失败
  183. loggerKit.info("任务id:{0}对应标题:{1},请求AIGC失败信息:{2},返回信息:{3}", task_id, keyword,
  184. request_data, response)
  185. redis_client.set(device_id + "douyin" + "comments", '不错👍')
  186. redis_client.set(device_id + "operate", action4_id)
  187. redis_client.set(f"{device_id}_step4", "1")
  188. redis_client.delete(f"{device_id}_step3")
  189. loggerKit.info("设备:{0}, action4_id:{1}", device_id, action4_id)
  190. return action4_dict
  191. response_body = json.loads(response.text)
  192. reply_data = response_body.get('data')
  193. reply = reply_data.get('comment')
  194. if '内容太少' in reply or '对不起' in reply or reply is None or reply == '' or '提供' in reply:
  195. # 获取到的评论内容不符合
  196. loggerKit.info("任务id:{0}对应标题:{1},请求AIGC失败信息:{2},返回信息:{3}", task_id, keyword,
  197. request_data, response)
  198. redis_client.set(device_id + "douyin" + "comments", '不错👍')
  199. redis_client.set(device_id + "operate", action4_id)
  200. redis_client.set(f"{device_id}_step4", "1")
  201. redis_client.delete(f"{device_id}_step3")
  202. loggerKit.info("设备:{0}, action4_id:{1}", device_id, action4_id)
  203. return action4_dict
  204. # 将需要评论的内容存入缓存
  205. redis_client.set(device_id + "douyin" + "comments", reply)
  206. redis_client.set(device_id + "operate", action4_id)
  207. redis_client.set(f"{device_id}_step4", "1")
  208. redis_client.delete(f"{device_id}_step3")
  209. loggerKit.info("设备:{0}, action4_id:{1}", device_id, action4_id)
  210. return action4_dict
  211. step4 = redis_client.get(f"{device_id}_step4")
  212. if step4 is not None and int(step4) == 1 and last_action_id is not None and int(perform_action_id) == int(
  213. last_action_id) \
  214. and perform_action_result == "success":
  215. loggerKit.info("taskId:{0}, action3_id_mem:{1}", task_id, int(last_action_id))
  216. action4_id = int(round(time.time() * 1000))
  217. """
  218. 发送第4条指令
  219. 点击评论框
  220. """
  221. action4_dict = single_click_by_control(action4_id, target_app="douyin", target_version="28.8.0",
  222. package_name="com.ss.android.ugc.aweme.lite",
  223. control_id="com.ss.android.ugc.aweme.lite:id/bq0", timeout=5)
  224. redis_client.set(device_id + "operate", action4_id)
  225. redis_client.set(f"{device_id}_step5", "1")
  226. redis_client.delete(f"{device_id}_step4")
  227. loggerKit.info("taskId:{0}, action5_id:{1}", task_id, action4_id)
  228. return action4_dict
  229. step5 = redis_client.get(f"{device_id}_step5")
  230. if step5 is not None and int(step5) == 1 and last_action_id is not None and int(perform_action_id) == int(
  231. last_action_id) \
  232. and perform_action_result == "success":
  233. loggerKit.info("taskId:{0}, action5_id_mem:{1}", task_id, int(last_action_id))
  234. action6_id = int(round(time.time() * 1000))
  235. """
  236. 发送第6条指令
  237. 对评论框进行赋值
  238. """
  239. reply = redis_client.get(device_id + "douyin" + "comments")
  240. if reply is None:
  241. # 调用AIGC获取评论失败
  242. del_key_vague(device_id)
  243. return_dict = {
  244. "data": "",
  245. "code": -2,
  246. "message": "fail, result_text is null"
  247. }
  248. callback_task(500, '抖音回复内容为null,无法评论', task_id, device_id, 0, None, None)
  249. return return_dict
  250. action6_dict = send_text_by_control(action6_id, target_app="douyin", target_version="28.8.0",
  251. package_name="com.ss.android.ugc.aweme.lite",
  252. control_id="com.ss.android.ugc.aweme.lite:id/bq0",
  253. content=reply, timeout=5)
  254. redis_client.set(device_id + "operate", action6_id)
  255. redis_client.set(f"{device_id}_step6", "1")
  256. redis_client.delete(f"{device_id}_step5")
  257. content_result = content_detail(keyword, sub_resource_name, reply, None)
  258. redis_client.set(device_id + "reply_json", json.dumps(content_result.to_dict(), ensure_ascii=False))
  259. loggerKit.info("设备:{0}, action6_id:{1}", device_id, action6_id)
  260. return action6_dict
  261. step6 = redis_client.get(f"{device_id}_step6")
  262. if step6 is not None and int(step6) == 1 and last_action_id is not None and int(perform_action_id) == int(
  263. last_action_id) \
  264. and perform_action_result == "success":
  265. loggerKit.info("设备:{0}, action6_id_mem:{1}", device_id, int(last_action_id))
  266. action7_id = int(round(time.time() * 1000))
  267. """
  268. 发送第7条指令
  269. 发送评论
  270. """
  271. action7_dict = single_click_by_control(action7_id, target_app="douyin", target_version="28.8.0",
  272. package_name="com.ss.android.ugc.aweme.lite",
  273. control_id="com.ss.android.ugc.aweme.lite:id/bg=", timeout=5)
  274. redis_client.set(device_id + "operate", action7_id)
  275. redis_client.set(f"{device_id}_step7", "1")
  276. redis_client.delete(f"{device_id}_step6")
  277. loggerKit.info("设备:{0}, action7_id:{1}", device_id, action7_id)
  278. return action7_dict
  279. """
  280. 停止app
  281. """
  282. step7 = redis_client.get(f"{device_id}_step7")
  283. if step7 is not None and int(step7) == 1 and last_action_id is not None and int(perform_action_id) == int(
  284. last_action_id) and perform_action_result == "success":
  285. loggerKit.info("设备:{0}, action10_id_mem:{1}", device_id, int(last_action_id))
  286. action8_id = int(round(time.time() * 1000))
  287. """
  288. 停止指令
  289. 停止app
  290. """
  291. action11_dict = stop_app(action8_id, target_app="douyin", target_version="28.8.0",
  292. package_name="com.ss.android.ugc.aweme.lite", timeout=5)
  293. reply_json = redis_client.get(device_id + "reply_json")
  294. del_key_vague(device_id)
  295. loggerKit.info("设备:{0}, action11_id:{1}", device_id, action8_id)
  296. # 回调任务中心修改任务状态
  297. callback_task(None, None, task_id, device_id, 1, None, reply_json)
  298. return action11_dict
  299. else:
  300. action0_id = int(round(time.time() * 1000))
  301. """
  302. 启动指令
  303. 启动app
  304. """
  305. action0_dict = start_app(action0_id, target_app="douyin", target_version="29.5.0",
  306. package_name="com.ss.android.ugc.aweme.lite")
  307. redis_client.set(device_id + "operate", action0_id)
  308. redis_client.set(f"{device_id}_step0", "1")
  309. redis_client.delete(f"{device_id}_step9")
  310. loggerKit.info("设备:{0}, action0_id:{1}", device_id, action0_id)
  311. return action0_dict
  312. # 批量模糊删除缓存
  313. # def del_key_vague(device_id):
  314. # # 批量模糊删除keys
  315. # keys = redis_client.match_pattern_prefix(device_id)
  316. # if len(keys) > 0:
  317. # # 需要判断是否有匹配的值, 没有的话会报错
  318. # for key in keys:
  319. # redis_client.delete(key)
  320. # loggerKit.info(f"clear {device_id} keys success...")
  321. # else:
  322. # loggerKit.info(f"{device_id} keys none ...")
  323. def callback_task(err_code, err_msg, task_id, device_id, execute_status, result, content):
  324. callback_request2 = call_back_request(err_code, err_msg, task_id, execute_status, result, content, None)
  325. loggerKit.info("thread[{0}=>{1}], taskId:{2}, 设备号:{3}, url:{4}, 执行状态:{5}, requestJson:{6} 。回调开始",
  326. threading.current_thread().name,
  327. threading.get_ident(), task_id, device_id, task_callback_url, execute_status, callback_request2)
  328. current_timeout = (5, 10)
  329. callback_response2 = requests.post(task_callback_url, json=callback_request2, timeout=current_timeout)
  330. loggerKit.info("thread[{0}=>{1}], taskId:{2},设备号:{3}。回调结束:{4}, 执行状态:{5}, result:{6}",
  331. threading.current_thread().name,
  332. threading.get_ident(), task_id, device_id, json.dumps(callback_response2.text, ensure_ascii=False),
  333. execute_status, result)
  334. return callback_response2
  335. """
  336. 添加了其他参数
  337. """
  338. # post请求
  339. def call_back_request(error_code, error_msg, task_id, task_status, task_execute_response, content, demo_flag):
  340. response = {
  341. "errorCode": error_code,
  342. "errorMsg": error_msg,
  343. "taskId": task_id,
  344. "taskStatus": task_status,
  345. "taskExecuteResponse": task_execute_response,
  346. "content": content,
  347. "demoFlag": demo_flag
  348. }
  349. return response
  350. # 回复详情
  351. class content_detail:
  352. def __init__(self, keyword, title, reply, account_name):
  353. """
  354. :rtype: object
  355. """
  356. # 搜索关键词
  357. self.keyword = keyword
  358. self.title = title
  359. self.reply = reply
  360. self.account_name = account_name
  361. def to_dict(self):
  362. return {
  363. 'keyword': self.keyword,
  364. 'title': self.title,
  365. 'reply': self.reply,
  366. 'accountName': self.account_name
  367. }