file_util.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. # coding: utf-8
  2. import base64
  3. import binascii
  4. import datetime
  5. import hashlib
  6. import ipaddress
  7. import logging
  8. import os
  9. import random
  10. import re
  11. import socket
  12. import string
  13. import subprocess
  14. import sys
  15. import urllib
  16. from collections import OrderedDict
  17. from urllib.parse import urlparse, urlunparse
  18. # 上面是标准库,下面是第三方库库
  19. import socks
  20. # python3.8 -m pip install PySocks
  21. from bs4 import BeautifulSoup
  22. def choose_from_list(items):
  23. """
  24. 返回一个选中的list。
  25. """
  26. while True:
  27. try:
  28. print("Available options:")
  29. for index, item in enumerate(items):
  30. print(f"{index} {item}")
  31. print(f"{len(items)} all")
  32. user_input = input("Choose your item(s) with index (comma-separated for multiple):\n")
  33. if user_input.lower() == 'all':
  34. return items
  35. selected_indices = [int(index.strip()) for index in user_input.split(',') if index.strip().isdigit()]
  36. if all(0 <= index < len(items) for index in selected_indices):
  37. return [items[index] for index in selected_indices]
  38. else:
  39. print("Invalid index number. Please try again.")
  40. except EOFError:
  41. sys.exit(0)
  42. except KeyboardInterrupt:
  43. sys.exit(0)
  44. except ValueError:
  45. print("Invalid input. Please enter valid index number(s).")
  46. def choose_from_iterable(iterable):
  47. """
  48. 返回一个选中的list。
  49. """
  50. ordered_iterable = iterable
  51. # 如果输入是字典,则转换为有序字典
  52. if isinstance(iterable, dict):
  53. ordered_iterable = OrderedDict(iterable)
  54. while True:
  55. try:
  56. print("Available options:")
  57. for index, item in enumerate(ordered_iterable):
  58. print(f"{index} {item}")
  59. print(f"{len(ordered_iterable)} all")
  60. user_input = input("Choose your item(s) with index (comma-separated for multiple):\n")
  61. if user_input.lower() == 'all':
  62. return list(ordered_iterable.values())
  63. selected_indices = [int(index.strip()) for index in user_input.split(',') if index.strip().isdigit()]
  64. if all(0 <= index < len(ordered_iterable) for index in selected_indices):
  65. selected_items = [list(ordered_iterable.values())[index] for index in selected_indices]
  66. return selected_items
  67. else:
  68. print("Invalid index number. Please try again.")
  69. except EOFError:
  70. sys.exit(0)
  71. except KeyboardInterrupt:
  72. sys.exit(0)
  73. except ValueError:
  74. print("Invalid input. Please enter valid index number(s).")
  75. def get_file_content(file_path):
  76. encodings = ["utf-8", "gbk"]
  77. for encoding in encodings:
  78. try:
  79. with open(file_path, "r", encoding=encoding) as fp:
  80. return fp.read()
  81. except:
  82. continue
  83. return None
  84. def get_full_path(path):
  85. if path.startswith("~"):
  86. path = os.path.expanduser(path)
  87. if path.startswith("./") or path.startswith(".\\"):
  88. path = os.path.abspath(path)
  89. return path
  90. def clean_list(lines, spliter=None, do_strip=True, remove_empty=True):
  91. """
  92. 如果有分割符,会对每行再进行分割
  93. 默认对每个元素进行strip
  94. 默认删除空字符串
  95. :param lines:
  96. :param spliter:
  97. :param do_strip:
  98. :param remove_empty:
  99. :return:
  100. """
  101. if isinstance(lines, list):
  102. if spliter:
  103. lines = [item for line in lines for item in line.split(spliter)]
  104. if do_strip:
  105. lines = [line.strip() for line in lines]
  106. if remove_empty and "" in lines:
  107. lines = [line for line in lines if line != ""]
  108. return lines
  109. return lines
  110. def get_lines_from_file(file_path, spliter=";", do_strip=True, remove_empty=True):
  111. """
  112. 从文件中读行,返回一个列表。
  113. 如果有分割符,会对每行再进行分割
  114. 默认对每个元素进行strip
  115. 默认删除空字符串
  116. :param file_path:
  117. :param spliter:
  118. :param do_strip:
  119. :param remove_empty:
  120. :return:
  121. """
  122. encodings_to_try = ['utf-8', 'gbk'] # 尝试的编码列表
  123. for encoding in encodings_to_try:
  124. try:
  125. with open(file_path, 'r', encoding=encoding) as f:
  126. lines = f.readlines()
  127. return clean_list(lines, spliter, do_strip, remove_empty)
  128. except UnicodeDecodeError:
  129. continue
  130. except FileNotFoundError:
  131. print(f"File not found: {file_path}")
  132. return None
  133. except Exception as e:
  134. print(f"An error occurred: {e}")
  135. return None
  136. return None # 如果都尝试失败,则返回 None
  137. def get_lines_from_quote(text, spliter=";", do_strip=True, remove_empty=True):
  138. if not text or not isinstance(text, str):
  139. return []
  140. lines = text.splitlines()
  141. return clean_list(lines, spliter, do_strip, remove_empty)
  142. def get_lines_from_console(spliter=";", do_strip=True, remove_empty=True):
  143. lines = []
  144. print("Enter multiple lines of text (Ctrl+D or Ctrl+Z to end):")
  145. while True:
  146. try:
  147. line = input()
  148. lines.append(line)
  149. except EOFError:
  150. break
  151. return clean_list(lines, spliter, do_strip, remove_empty)
  152. def is_valid_domain(domain):
  153. domain_pattern = "^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\\.)+[A-Za-z]{2,6}$"
  154. return re.match(domain_pattern, domain) is not None
  155. def is_valid_host(host):
  156. if not host:
  157. return False
  158. try:
  159. return is_valid_domain(host) or is_valid_ip(host)
  160. except:
  161. return False
  162. def is_valid_ip(host):
  163. try:
  164. ipaddress.ip_address(host)
  165. return True
  166. except ValueError:
  167. return False
  168. def is_valid_subnet(subnet):
  169. '''
  170. strict =False,因为想要 192.168.1.1/27这个格式返回true
  171. :param subnet:
  172. :return:
  173. '''
  174. try:
  175. ipaddress.ip_network(subnet, strict=False)
  176. return True
  177. except ValueError:
  178. return False
  179. def is_valid_domain_by_query(host):
  180. try:
  181. socket.getaddrinfo(host, None)
  182. return True
  183. except socket.gaierror:
  184. return False
  185. def is_valid_port(port):
  186. try:
  187. p = int(port)
  188. if 0 <= p <= 65535:
  189. return True
  190. except:
  191. pass
  192. return False
  193. def get_ip_list_of_subnet(subnet):
  194. try:
  195. tmp = ipaddress.ip_network(subnet, strict=False)
  196. result = [item.__str__() for item in tmp.hosts()]
  197. return result
  198. except ValueError:
  199. return []
  200. def get_logger(log_file_name='logger.log'):
  201. """
  202. # https://stackoverflow.com/questions/7016056/python-logging-not-outputting-anything
  203. # 只是将handler的level改成debug是不够的,还需要设置logger本身的level。logger是上游,handler是下游
  204. :param log_file_name:
  205. :return:
  206. """
  207. formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
  208. # 使用basicConfig设置全局的日志级别
  209. logging.basicConfig(level=logging.DEBUG)
  210. # 创建logger
  211. logger = logging.getLogger('main')
  212. # 创建和设置StreamHandler和FileHandler
  213. handlers = [logging.StreamHandler(sys.stdout), logging.FileHandler(log_file_name, encoding="utf-8")]
  214. for handler in handlers:
  215. handler.setFormatter(formatter)
  216. logger.addHandler(handler)
  217. return logger
  218. def gen_random_str(to_void):
  219. # 定义包含所有可能字符的字符集合
  220. while True:
  221. # characters = string.ascii_letters + string.digits + string.punctuation
  222. characters = string.ascii_letters + string.digits
  223. # 使用random.choices()函数从字符集合中随机选择字符,并生成随机字符串
  224. random_string = ''.join(random.choices(characters, k=5))
  225. if to_void and random_string in to_void:
  226. continue
  227. else:
  228. return random_string
  229. def highlight_print(content, tips=""):
  230. if not tips:
  231. tips = ""
  232. print()
  233. print(("#" * 10 + "{}" + "#" * 10).format(tips))
  234. print(content)
  235. print("#" * (20 + len(str(tips))))
  236. print()
  237. def set_socks_proxy(proxy_host, proxy_port):
  238. """
  239. 设置全局的 SOCKS 代理,适用于所有套接字操作。
  240. socks.set_default_proxy() # 取消代理设置
  241. """
  242. try:
  243. import socks
  244. import socket
  245. proxy_port = int(proxy_port)
  246. print("set proxy: {}:{}".format(proxy_host, proxy_port))
  247. socks.set_default_proxy(socks.SOCKS5, proxy_host, proxy_port)
  248. socket.socket = socks.socksocket
  249. return True
  250. except:
  251. print("set socks proxy failed!!!")
  252. return False
  253. def is_using_socks_proxy():
  254. """
  255. 检测当前程序是否使用了 SOCKS 代理。
  256. :return: 如果使用了 SOCKS 代理,返回 True;否则返回 False。
  257. """
  258. import socket
  259. return socket.socket == socks.socksocket
  260. def get_base_url(url):
  261. '''
  262. return 末尾不包含/
  263. 引用方法:
  264. from 包名处(模块名称).文件名称 import 函数名称
  265. 包或者模块,是指含有__init__.py的文件夹
  266. '''
  267. parsed_url = urlparse(url)
  268. base_url = urlunparse((parsed_url.scheme, parsed_url.netloc, '', '', '', ''))
  269. return base_url
  270. def url_encode(url):
  271. return urllib.parse.quote(url)
  272. def url_decode(url):
  273. return urllib.parse.unquote(url)
  274. def get_argv(num_of_arg=1):
  275. result = []
  276. if len(sys.argv) > num_of_arg:
  277. # return sys.argv[1:-1] #这是错误的,末尾index所在元素是不会被包含的
  278. return sys.argv[1:num_of_arg + 222] # 只要大于长度都可以!
  279. else:
  280. index = 0
  281. while index < num_of_arg:
  282. arg = input("Enter argument {}: ".format(index + 1))
  283. result.append(arg)
  284. index += 1
  285. return result
  286. def get_textarea_contents(html, name=None):
  287. # Parse the HTML with BeautifulSoup
  288. soup = BeautifulSoup(html, 'html.parser')
  289. if name:
  290. # Find all <textarea> tags with the specified name attribute and extract their content
  291. textarea_contents = [textarea.text.strip() for textarea in soup.select(f'textarea[name="{name}"]')]
  292. else:
  293. # Find all <textarea> tags and extract their content
  294. textarea_contents = [textarea.text.strip() for textarea in soup.find_all('textarea')]
  295. return textarea_contents
  296. def get_content_by_class(html, class_name):
  297. """
  298. 根据
  299. :param html:
  300. :param class_name:
  301. :return:
  302. """
  303. # 使用 BeautifulSoup 解析 HTML
  304. soup = BeautifulSoup(html, 'html.parser')
  305. # 使用 CSS 选择器定位元素
  306. elements = soup.select(f'.{class_name}')
  307. # 提取元素的内容
  308. content = [element.get_text() for element in elements]
  309. return content
  310. def get_content_by_element(html, element_name):
  311. # 使用 BeautifulSoup 解析 HTML
  312. soup = BeautifulSoup(html, 'html.parser')
  313. # 使用 CSS 选择器定位元素
  314. elements = soup.select(element_name)
  315. # 提取元素的内容
  316. content = [element.get_text() for element in elements]
  317. return content
  318. def get_full_path_for_file(path):
  319. if path.startswith("~"):
  320. path = os.path.expanduser(path)
  321. if path.startswith("./") or path.startswith(".\\"):
  322. path = os.path.abspath(path)
  323. return path
  324. def is_file_path_by_pattern(path):
  325. # 定义常见文件路径的正则表达式模式
  326. path_patterns = [
  327. r'^[a-zA-Z]:\\[^:*?"<>|\r\n]*$', # Windows 绝对路径
  328. r'^[a-zA-Z]:/[^:*?"<>|\r\n]*$', # Windows 绝对路径(斜杠)
  329. r'^/[^:*?"<>|\r\n]*$', # Linux/MacOS 绝对路径
  330. r'^\.[a-zA-Z0-9_/-]*$', # 相对路径
  331. r'^\.\.[a-zA-Z0-9_/-]*$' # 相对路径(上级目录)
  332. ]
  333. # 使用正则表达式匹配路径
  334. for pattern in path_patterns:
  335. if re.match(pattern, path):
  336. return True
  337. # 如果没有匹配任何模式,则判定为无效路径
  338. return False
  339. def get_ip(host):
  340. try:
  341. return socket.gethostbyname(host)
  342. except:
  343. return None
  344. def startswith_regex(pattern, text):
  345. """
  346. 尝试从字符串的[开头]匹配模式,如果匹配成功则返回True,否则返回False
  347. """
  348. match = re.match(pattern, text)
  349. if match:
  350. return True
  351. else:
  352. return False
  353. def extract_between(text, start, end):
  354. """
  355. 提取2个字符串之间的内容,返回一个列表
  356. :param text:
  357. :param start:
  358. :param end:
  359. :return:
  360. """
  361. pattern = re.escape(start) + r"(.*?)" + re.escape(end)
  362. matches = re.findall(pattern, text)
  363. if matches:
  364. return matches
  365. else:
  366. return []
  367. def findfirst_regex(pattern, text):
  368. """
  369. 在整个字符串中搜索匹配,如果找到则返回一个匹配对象,否则返回None
  370. """
  371. if not (pattern and text):
  372. return None
  373. search_result = re.search(pattern, text)
  374. if search_result:
  375. return search_result.group(0)
  376. return None
  377. # 使用re.findall查找所有匹配
  378. def findall_regex(pattern, text):
  379. """
  380. <td><a href="(.*?)" class="model-link inside">
  381. 根据正则表达式提取所有匹配的内容,返回一个列表
  382. """
  383. if not (pattern and text):
  384. return []
  385. result_list = re.findall(pattern, text)
  386. return result_list
  387. def replaceall_regex(pattern, replaceto, text):
  388. """
  389. 将正则表达式匹配到的内容替换为replaceto的内容,返回替换后的完整文本
  390. """
  391. if not (pattern and replaceto and text):
  392. return text
  393. new_text = re.sub(pattern, replaceto, text)
  394. return new_text
  395. def get_base_url_for_file(url):
  396. """
  397. return 末尾不包含/
  398. 引用方法:
  399. from 包名处(模块名称).文件名称 import 函数名称
  400. 包或者模块,是指含有__init__.py的文件夹
  401. """
  402. parsed_url = urlparse(url)
  403. base_url = urlunparse((parsed_url.scheme, parsed_url.netloc, '', '', '', ''))
  404. return base_url
  405. def get_files_in_path(path):
  406. """
  407. 获取某个路径中所有文件的绝对路径
  408. """
  409. file_list = []
  410. # 如果是文件,直接返回文件的绝对路径
  411. if os.path.isfile(path):
  412. return [os.path.abspath(path)]
  413. # 如果是目录,遍历目录及其子目录,返回所有文件的绝对路径列表
  414. if os.path.isdir(path):
  415. for root, dirs, files in os.walk(path):
  416. for file in files:
  417. file_path = os.path.join(root, file)
  418. file_list.append(os.path.abspath(file_path))
  419. return file_list
  420. def contains_any(string, keywords):
  421. """
  422. 判断string是否包含任何一个关键词
  423. """
  424. if isinstance(keywords, str):
  425. return keywords in string
  426. if isinstance(keywords, (list, set)):
  427. for keyword in keywords:
  428. if keyword in string:
  429. return True
  430. return False
  431. def print_all_str_vars(keywords_to_exclude=None):
  432. """
  433. 打印用户定义的所有字符串变量,可以设置关键词根据变量名进行排除
  434. """
  435. if keywords_to_exclude is None:
  436. keywords_to_exclude = {}
  437. all_variables = globals()
  438. for var_name, var_value in all_variables.items():
  439. if not isinstance(var_value, str):
  440. continue
  441. if var_name.startswith("__") and var_name.endswith("__"):
  442. continue
  443. if contains_any(var_name, keywords_to_exclude):
  444. continue
  445. else:
  446. print(var_value)
  447. def split_line(line):
  448. """
  449. 将一行字符串分割成多个部分,连续的tab和空格都当作一个分隔符
  450. testcase = "This\tis \t a\t\t test string\t"
  451. testcase = "aaa bbb"
  452. :param line:
  453. :return:
  454. """
  455. # parts = re.split(r'\s+|\t+', line)
  456. parts = line.split()
  457. # 字符串的split函数本身就有这样的能力
  458. return parts
  459. def md5(input_string):
  460. # 创建一个 MD5 哈希对象
  461. md5_hash = hashlib.md5()
  462. # 更新哈希对象以包含输入字符串的字节表示
  463. md5_hash.update(input_string.encode('utf-8'))
  464. # 获取 MD5 哈希值的十六进制表示
  465. md5_hex = md5_hash.hexdigest()
  466. return md5_hex
  467. def get_time_str():
  468. """
  469. 返回当前时间的字符串,常用于文件名
  470. :return:
  471. """
  472. current_time = datetime.datetime.now()
  473. formatted_time = current_time.strftime("%Y_%m_%d_%H_%M_%S_%f")[:-3]
  474. return formatted_time
  475. def run_external_program(command):
  476. """
  477. 注意,要执行的命令、脚本,基本都要求绝对路径
  478. :param command:
  479. :return:
  480. """
  481. try:
  482. # Run the external program and capture its output
  483. result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
  484. # Check if the command was successful (return code 0)
  485. if result.returncode == 0:
  486. # Return the standard output
  487. return result.stdout.strip()
  488. else:
  489. # If the command failed, print the error message
  490. print(f"Error: {result.stderr.strip()}")
  491. return None
  492. except Exception as e:
  493. print(f"An error occurred: {e}")
  494. return None
  495. def deduplicate_list(input_list):
  496. # 使用OrderedDict.fromkeys()去重,并保持原始顺序
  497. deduplicated_dict = OrderedDict.fromkeys(input_list)
  498. # 将字典的键转换回列表
  499. deduplicated_list = list(deduplicated_dict.keys())
  500. return deduplicated_list
  501. def is_hex_code(input_str):
  502. """
  503. hello -- 68656C6C6F
  504. :param input_str:
  505. :return:
  506. """
  507. return re.match(r"^[0-9a-fA-F]+$", input_str) is not None
  508. def hex_code_to_byte_array(hex_code):
  509. r"""
  510. 68656C6C6F --- b'hello'
  511. 1112136162 --- b'\x11\x12\x13ab'
  512. 可以打印的字符直接以字符表示,不行的用\x加code
  513. :param hex_code:
  514. :return:
  515. """
  516. return bytes.fromhex(hex_code)
  517. def byte_array_to_hex_code(byte_array):
  518. """
  519. 使用方法
  520. byte_array_to_hex_code(b"hello") -- bytes can only contain ASCII literal characters.
  521. byte_array_to_hex_code("hello中文".encode())
  522. :param byte_array:
  523. :return:
  524. """
  525. # 使用 binascii.hexlify 将字节数组转换为 hex code
  526. hex_code = binascii.hexlify(byte_array).decode()
  527. return hex_code
  528. def is_base64(input_str):
  529. try:
  530. base64.b64decode(input_str)
  531. return True
  532. except ValueError:
  533. return False
  534. def base64_encode(data):
  535. """
  536. 传入的参数可以是 str 或者 byte array格式
  537. encode --- str to byte array
  538. decode --- byte array to str
  539. :param data:
  540. :return:
  541. """
  542. if isinstance(data, str):
  543. data = data.encode()
  544. # 进行 Base64 编码
  545. encoded_data = base64.b64encode(data).decode()
  546. return encoded_data
  547. def base64_decode(data):
  548. """
  549. 解码后,如果转化为字符串就返回字符串,否则就返回byte[]
  550. :param data:
  551. :return:
  552. """
  553. # 尝试解码成字符串
  554. try:
  555. decoded_str = base64.b64decode(data).decode()
  556. return decoded_str
  557. except UnicodeDecodeError:
  558. # 解码成字符串失败,返回字节数组
  559. decoded_bytes = base64.b64decode(data)
  560. return decoded_bytes
  561. except Exception:
  562. return None
  563. def get_files_in_dir(directory, extensions=None, include_subdir=True):
  564. """
  565. 获取目录下的所有文件。
  566. 参数:
  567. - directory: 目标目录的路径。
  568. - extensions: 文件后缀过滤列表,例如 ['.txt', '.pdf'],默认为 None。
  569. - include_subdir: 是否遍历子目录,True 为遍历,False 为不遍历,默认为 True。
  570. 返回:
  571. 包含所有文件路径的列表。
  572. """
  573. files = []
  574. extensions = tuple(extensions) if extensions else None
  575. def is_valid_file(filename):
  576. return extensions is None or filename.endswith(extensions)
  577. if include_subdir:
  578. # 遍历目录及其子目录
  579. for root, _, filenames in os.walk(directory):
  580. for filename in filenames:
  581. if is_valid_file(filename):
  582. files.append(os.path.join(root, filename))
  583. else:
  584. # 不遍历子目录,直接获取目录下的文件列表
  585. files = [os.path.join(directory, filename) for filename in os.listdir(directory)
  586. if os.path.isfile(os.path.join(directory, filename)) and is_valid_file(filename)]
  587. return files