from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from tools.config_db import DB_CONFIG """ 数据库操作工具类 """ class sql_util: """ Singleton class for managing MySQL database connection and operations """ _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super(sql_util, cls).__new__(cls, *args, **kwargs) return cls._instance def __init__(self): self.engine = create_engine( f"mysql+mysqlconnector://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['db_name']}", max_overflow=0, pool_size=5, pool_timeout=30, pool_recycle=-1 ) self.SessionFactory = sessionmaker(bind=self.engine) self.session = self.SessionFactory() def execute_query(self, query, data=None): result = self.session.execute(text(query), data) return result def add_data(self, table_name, data): columns = ', '.join(data.keys()) values = ', '.join([':%s' % key for key in data.keys()]) query = f"INSERT INTO {table_name} ({columns}) VALUES ({values})" self.execute_query(query, data) self.session.commit() def delete_data(self, table_name, condition): query = f"DELETE FROM {table_name} WHERE {condition}" self.execute_query(query) self.session.commit() def update_data(self, table_name, condition, data): set_values = ', '.join([f"{key} = {data.get(key)}" for key in data.keys()]) query = f"UPDATE {table_name} SET {set_values} WHERE {condition}" print(f'update sql: {query}') self.execute_query(query, data) self.session.commit() def search_data(self, table_name, columns=None, condition=None): column_str = '*' if columns is None else ', '.join(columns) query = f"SELECT {column_str} FROM {table_name}" if condition: query += f" WHERE {condition}" result = self.execute_query(query) return result.fetchall() def close_session(self): self.session.close()