import ctypes import platform import subprocess import sys import os import threading import time from datetime import datetime from PyQt5.QtCore import Qt, QUrl, QDateTime, QThread, pyqtSignal, pyqtSlot from PyQt5.QtWidgets import QApplication, QMainWindow, QLabel, QFileDialog, QMessageBox, QDateTimeEdit, QPushButton from PyQt5.QtGui import QDesktopServices from app import login_account, run_threading_order, run_daily_job, run_manual_job, cancel_manual_job, stop_order_job, \ write_ticket_info from litter_helper import Ui_menu # 确保这个导入路径是正确的 from tools.utils import save_all_proxy_ip_v3 import winreg from pathlib import Path # 定义 Redis 安装程序路径 REDIS_INSTALLER_PATH = "Redis-x64-5.0.14.1.msi" # 替换为实际的安装程序路径 class ClickableLabel(QLabel): def __init__(self, parent=None, file_path=None): super().__init__(parent) self.file_path = file_path def mousePressEvent(self, event): if event.button() == Qt.LeftButton: self.emit_click_event() def emit_click_event(self): if self.file_path: self.parent().on_label_click(self.file_path) def get_resource_path(relative_path): """获取资源文件的绝对路径""" if getattr(sys, 'frozen', False): # 是否为PyInstaller打包的exe文件 # 返回exe文件所在的绝对路径 base_path = os.path.dirname(sys.executable) else: # 在开发环境下运行 # 返回脚本文件所在的绝对路径 base_path = os.path.dirname(__file__) return os.path.join(base_path, relative_path) def get_dir_path(relative_path): """获取资源文件路径""" base_path = getattr(sys, '_MEIPASS', str(Path(__file__).parent.absolute())) return Path(base_path, relative_path) # 停止定时任务 def on_stop_time_button_click(): print(f"停止定时任务: {time.time()}") cancel_manual_job() # 写入文件 def on_write_file_button_click(): print(f"写入文件:{time.time()}") write_ticket_info("abcd", "1234556") class WorkerThread(QThread): """用于执行下单逻辑的线程""" signal = pyqtSignal(str) # 定义一个信号,用于将信息传递回主线程 def __init__(self, thread_num, accounts_file, net_url): super().__init__() self.thread_num = thread_num self.accounts_file = accounts_file self.net_url = net_url def run(self): try: run_threading_order(self.thread_num, self.accounts_file, self.net_url) self.signal.emit("下单完成") # 发送信号,通知主线程下单完成 except Exception as e: self.signal.emit(f"下单失败: {e}") # 发送信号,通知主线程下单失败 class LoginThread(QThread): """用于执行登录逻辑的线程""" signal = pyqtSignal(str) # 定义一个信号,用于将信息传递回主线程 def __init__(self, thread_num, accounts_file, pocket_url): super().__init__() self.thread_num = thread_num self.accounts_file = accounts_file self.pocket_url = pocket_url def run(self): try: login_account(self.thread_num, self.accounts_file, self.pocket_url) self.signal.emit("登录成功") # 发送信号,通知主线程登录成功 except Exception as e: self.signal.emit(f"登录失败: {e}") # 发送信号,通知主线程登录失败 class MainWindow(QMainWindow, Ui_menu): def __init__(self, param1, param2): super().__init__() self.setupUi(self) # 使用传递的参数 self.param1 = param1 self.param2 = param2 # 设置标签的文本为传递的参数 # self.label.setText(f"参数1: {self.param1}, 参数2: {self.param2}") self.concurrencyLineEdit.setText("1") self.urlLineEdit.setText("https://t.livepocket.jp/e/l3im7") # 登录账号 self.loginPushButton.clicked.connect(self.on_login_button_click) # 开始下单 self.startPushButton.clicked.connect(self.on_start_button_click) # 停止下单 self.stopOrderPushButton.clicked.connect(self.on_stop_button_click) # 定时开始 self.startTimePushButton.clicked.connect(self.on_start_time_button_click) # 停止定时任务 self.stopTimePushButton.clicked.connect(on_stop_time_button_click) # 写入文件 # self.writeFilePushButton.clicked.connect(on_write_file_button_click) # 编辑账号 # 获取 QLabel 控件并替换为 ClickableLabel self.accountsLabel = self.findChild(QLabel, "accountsLabel") if self.accountsLabel: # 替换为 ClickableLabel self.clickableAccountsLabel = ClickableLabel(self, file_path=get_resource_path("accounts.txt")) self.clickableAccountsLabel.setObjectName("accountsLabel") self.clickableAccountsLabel.setText("编辑账号") self.clickableAccountsLabel.setGeometry(self.accountsLabel.geometry()) self.clickableAccountsLabel.show() self.accountsLabel.deleteLater() # 删除原来的 QLabel 控件 # 编辑IP代理池 # 获取 QLabel 控件并替换为 ClickableLabel self.ipPoolLabel = self.findChild(QLabel, "ipPoolLabel") if self.ipPoolLabel: # 替换为 ClickableLabel self.clickableIpPoolLabel = ClickableLabel(self, file_path=get_resource_path("proxy_list.txt")) self.clickableIpPoolLabel.setObjectName("ipPoolLabel") self.clickableIpPoolLabel.setText("编辑代理池") self.clickableIpPoolLabel.setGeometry(self.ipPoolLabel.geometry()) self.clickableIpPoolLabel.show() self.ipPoolLabel.deleteLater() # 删除原来的 QLabel 控件 # 获取 QDateTimeEdit 控件并设置显示格式 self.startDateTimeEdit = self.findChild(QDateTimeEdit, "startDateTimeEdit") if self.startDateTimeEdit: self.startDateTimeEdit.setDisplayFormat("yyyy-MM-dd HH:mm:ss") # 设置当前时间 self.startDateTimeEdit.setDateTime(QDateTime.currentDateTime()) # 获取 QDateTimeEdit 控件并设置显示格式 self.endDateTimeEdit = self.findChild(QDateTimeEdit, "endDateTimeEdit") if self.endDateTimeEdit: self.endDateTimeEdit.setDisplayFormat("yyyy-MM-dd HH:mm:ss") # 设置当前时间 self.endDateTimeEdit.setDateTime(QDateTime.currentDateTime()) # 连接 watchPushButton 点击事件到槽函数 self.watchPushButton = self.findChild(QPushButton, "watchPushButton") if self.watchPushButton: self.watchPushButton.clicked.connect(self.on_watch_button_click) # 检查 Redis 是否安装 self.check_redis_installation() def closeEvent(self, event): """重写 closeEvent 方法,在窗口关闭时终止程序""" reply = QMessageBox.question(self, '退出', "确定要退出吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: event.accept() # 接受关闭事件 sys.exit() # 退出应用程序 else: event.ignore() # 忽略关闭事件 def on_label_click(self, file_path): if os.path.exists(file_path): try: QDesktopServices.openUrl(QUrl.fromLocalFile(file_path)) except Exception as e: QMessageBox.critical(self, "Error", f"无法打开文件: {e}") else: QMessageBox.critical(self, "Error", f"文件不存在: {file_path}") def on_start_button_click(self): print(f"开始下单!") # 在按钮点击时执行的操作 # print(f"按钮被点击了!参数1: {self.param1}, 参数2: {self.param2}") # 获取 QDateTimeEdit 控件的值 if self.startDateTimeEdit: date_time_value = self.startDateTimeEdit.dateTime() # print(f"选中的日期和时间: {date_time_value.toString('yyyy-MM-dd HH:mm:ss')}") # 获取 QDateTimeEdit 控件的值 if self.endDateTimeEdit: date_time_value = self.endDateTimeEdit.dateTime() # print(f"选中的日期和时间: {date_time_value.toString('yyyy-MM-dd HH:mm:ss')}") # 获取并发数 thread_num = 1 if self.concurrencyLineEdit: # print(f"并发数为: {self.concurrencyLineEdit.text()}") thread_num = self.concurrencyLineEdit.text() # url net_url = None if self.urlLineEdit: net_url = self.urlLineEdit.text() # 创建线程并启动 self.worker_thread = WorkerThread(thread_num, get_resource_path("accounts.txt"), net_url) self.worker_thread.signal.connect(self.on_worker_thread_signal) self.worker_thread.start() # 登录账号 def on_login_button_click(self): print(f"登录账号! {self.urlLineEdit.text()}") thread_num = 1 if self.concurrencyLineEdit.text(): thread_num = self.concurrencyLineEdit.text() pocket_url = None if self.urlLineEdit.text(): pocket_url = self.urlLineEdit.text() save_all_proxy_ip_v3(get_resource_path("proxy_list.txt")) # 创建登录线程 self.login_thread = LoginThread(thread_num, get_resource_path("accounts.txt"), pocket_url) self.login_thread.signal.connect(self.on_login_thread_signal) self.login_thread.start() else: QMessageBox.warning(self, '警告', '请输入URL') return # 定时开始 def on_start_time_button_click(self): print(f"定时开始:{time.time()}") net_url = None if self.urlLineEdit: net_url = self.urlLineEdit.text() start_time = self.startDateTimeEdit.text() print(f"start_time:{start_time}") # 使用datetime模块解析时间字符串 dt = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') # 提取出时、分、秒 hour = dt.hour minute = dt.minute second = dt.second print(f"Hour: {hour}, Minute: {minute}, Second: {second}") # 获取并发数 thread_num = self.concurrencyLineEdit.text() # 创建一个新的线程来运行 run_manual_job 函数 thread = threading.Thread(target=run_manual_job, args=(hour, minute, thread_num, net_url)) thread.start() else: QMessageBox.warning(self, '警告', '请输入URL') return # 终止下单 def on_stop_button_click(self): print(f"终止下单! 目标网址: {self.urlLineEdit.text()}") stop_order_job() def edit_accounts(self): # 编辑账号 print(f"编辑账号: {self.urlLineEdit.text()}") # 查看结果 def on_watch_button_click(self): # 获取项目当前路径 current_path = sys.path[0] # 拼接文件夹路径 folder_path = get_dir_path("results") # 假设文件夹名为 "results" # 判断文件夹是否存在,不存在则创建 if not folder_path.exists(): try: folder_path.mkdir(parents=True, exist_ok=True) print(f"文件夹 {folder_path} 已创建") except Exception as e: QMessageBox.critical(self, "Error", f"无法创建文件夹: {e}") return # 打开文件夹 folder_path = get_resource_path("results") # 假设文件夹名为 "results" try: # 使用 QDesktopServices.openUrl 打开文件夹 QDesktopServices.openUrl(QUrl.fromLocalFile(folder_path)) except Exception as e: QMessageBox.critical(self, "Error", f"无法打开文件夹: {e}") # 检查 Redis 是否安装 def check_redis_installation(self): # 检查操作系统 system = platform.system() if system == "Windows": # 检查 Redis 服务是否正在运行 try: key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Services\Redis") winreg.CloseKey(key) print("Redis 服务已安装并正在运行") return except FileNotFoundError as ex: print(f"Redis 服务未安装或未运行") # 如果 Redis 服务未安装,则启动安装程序 if REDIS_INSTALLER_PATH: try: subprocess.run([REDIS_INSTALLER_PATH], shell=True) print("Redis 安装程序已启动") except Exception as e: QMessageBox.critical(self, "Error", f"无法启动 Redis 安装程序: {e}") else: # 在其他操作系统上,可以使用不同的方法检查 Redis 是否安装 # 例如,可以使用 `which redis-server` 命令 print("当前操作系统不支持自动安装 Redis") # 处理线程信号 @pyqtSlot(str) def on_worker_thread_signal(self, message): QMessageBox.information(self, "下单结果", message) # 处理登录线程信号 @pyqtSlot(str) def on_login_thread_signal(self, message): QMessageBox.information(self, "登录结果", message) if __name__ == "__main__": app = QApplication(sys.argv) # 创建窗口并传递参数 window = MainWindow("Hello", "World") window.show() sys.exit(app.exec_())