sql_util.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from sqlalchemy import create_engine, text
  2. from sqlalchemy.orm import sessionmaker
  3. from tools.config_db import DB_CONFIG
  4. """
  5. 数据库操作工具类
  6. """
  7. class sql_util:
  8. """
  9. Singleton class for managing MySQL database connection and operations
  10. """
  11. _instance = None
  12. def __new__(cls, *args, **kwargs):
  13. if not cls._instance:
  14. cls._instance = super(sql_util, cls).__new__(cls, *args, **kwargs)
  15. return cls._instance
  16. def __init__(self):
  17. self.engine = create_engine(
  18. f"mysql+mysqlconnector://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['db_name']}",
  19. max_overflow=0,
  20. pool_size=5,
  21. pool_timeout=30,
  22. pool_recycle=-1
  23. )
  24. self.SessionFactory = sessionmaker(bind=self.engine)
  25. self.session = self.SessionFactory()
  26. def execute_query(self, query, data=None):
  27. result = self.session.execute(text(query), data)
  28. return result
  29. def add_data(self, table_name, data):
  30. columns = ', '.join(data.keys())
  31. values = ', '.join([':%s' % key for key in data.keys()])
  32. query = f"INSERT INTO {table_name} ({columns}) VALUES ({values})"
  33. self.execute_query(query, data)
  34. self.session.commit()
  35. def delete_data(self, table_name, condition):
  36. query = f"DELETE FROM {table_name} WHERE {condition}"
  37. self.execute_query(query)
  38. self.session.commit()
  39. def update_data(self, table_name, condition, data):
  40. set_values = ', '.join([f"{key} = {data.get(key)}" for key in data.keys()])
  41. query = f"UPDATE {table_name} SET {set_values} WHERE {condition}"
  42. print(f'update sql: {query}')
  43. self.execute_query(query, data)
  44. self.session.commit()
  45. def search_data(self, table_name, columns=None, condition=None):
  46. column_str = '*' if columns is None else ', '.join(columns)
  47. query = f"SELECT {column_str} FROM {table_name}"
  48. if condition:
  49. query += f" WHERE {condition}"
  50. result = self.execute_query(query)
  51. return result.fetchall()
  52. def close_session(self):
  53. self.session.close()