123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- # update: 2023-11-12-19
- """
- pip install pymysql==0.9.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
- pip install SQLAlchemy==1.4.30 -i https://pypi.tuna.tsinghua.edu.cn/simple
- 问题:
- CryptographyDeprecationWarning: Python 3.6 is no longer supported by the Python core team. Therefore, support for it is deprecated in cryptography. The next release of cryptography will remove support for Python 3.6.
- from cryptography.hazmat.backends import default_backen
- 解决:
- pip3 install cryptography==3.4.8
- see: https://blog.csdn.net/weixin_46281427/article/details/122916870
- # --- 解决root密码中包含@的问题
- self.mysql_dwd_config = {
- 'drivername': 'mysql+pymysql',
- 'username': 'user_a',
- 'password': 'xxx@#$xxx',
- 'host': 'am-xxxxx.ads.aliyuncs.com',
- 'port': 3306,
- }
- if sqlalchemy.__version__ >= '1.4': #其实大于1.4.15之后,密码里面含有@,就必须以这种方式创建正确正则识别密码的引擎了。
- self.mysql_engine_url = sqlalchemy.engine.URL.create(**self.mysql_dwd_config)
- self.mysql_engine_url = self.mysql_engine_url.update_query_dict({'charset': 'utf8mb4'})
- else:
- # password 含有@
- self.mysql_engine_url = '{drivername}://{username}:{password}@{host}:{port}/?charset=utf8mb4'.format(**self.mysql_dwd_config)
- self.mysql_dwd_engine = sqlalchemy.create_engine(self.mysql_engine_url)
- """
- from sqlalchemy import create_engine, inspect, MetaData, Table
- from sqlalchemy.orm import sessionmaker
- class Client():
- def __init__(self, host='127.0.0.1', port=3306, database='ar', username='root', password='20221212!'):
- """
- """
- # 设置字符集
- args = 'charset=utf8mb4'
- # 设置关闭 only_full_group_by 模式
- args = '&sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'
- self.engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?{args}")
- # self.engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}")
- self.database = database
- self.inspect = inspect(self.engine)
- def get_all_database(self):
- """获取全部数据库名称"""
- return self.inspect.get_schema_names()
- def get_all_table(self, database_name):
- """获取全部数据表名称"""
- return self.inspect.get_table_names(schema=database_name)
- def get_all_field(self, table_name):
- """获取全部字段名称"""
- fields = self.inspect.get_columns(table_name, schema=self.database) # 表名,库名
- return [i.get('name') for i in fields]
- def execute_sql(self, sql):
- """
- 执行sql
- """
- try:
- result = self.engine.execute(sql).fetchone()[0]
- # print(result.__class__.__name__)
- return result
- except Exception as exception:
- print(exception.__class__.__name__)
- return None
- def get_all(self, table_name):
- """
- 获取全部数据
- """
- sql = f"SELECT * FROM {table_name};"
- try:
- result = self.engine.execute(sql).fetchall() # 返回的是列表
- # print(result.__class__.__name__)
- return result
- except Exception as exception:
- print(exception.__class__.__name__)
- return None
- def get_one(self, table_name='ar_user', where_key='ACCT', where_val='admin'):
- """
- 单条获取
- """
- sql = f"SELECT * FROM {table_name} WHERE {where_key} = '{where_val}';"
- try:
- result = self.engine.execute(sql).fetchone() # 返回的是元祖
- # result = self.engine.execute(sql).fetchall() # 返回的是列表
- # print(result.__class__.__name__)
- return result
- except Exception as exception:
- print(exception.__class__.__name__)
- return None
- if __name__ == '__main__':
- # --- init ---
- # db = Client(host='127.0.0.1', port=3306, database='ar', username='root', password='20221212!')
- db = Client(host='127.0.0.1', port=3306, database='ar', username='root', password='rootroot&123123')
- # db = Client(host='58.34.94.176', port=8806, database='ar', username='root', password='rootroot&123123')
- # --- test ---
- # result = db.get_all_field('ar_user')
- # result = db.get_all('ar_phone_call')
- result = db.get_one('ar_user', 'USER_ID', '10000LXQ')
- print(result)
- print(result[1])
- # --- test ---
- # sql = f'''select a.USER_ID, a.USER_NAME, a.EMP_NO, a.PHOTO, a.PHONE, c.DEPT_ID, c.DEPT_NAME, e.ROLE_ID, e.ROLE_NAME, a.IS_DISABLED
- # from ar_user a left JOIN ar_user_dept_rel b ON a.USER_ID = b.USER_ID
- # left join ar_dept c on c.DEPT_ID = b.DEPT_ID
- # left join ar_user_role_rel d on a.USER_ID = d.USER_ID
- # left join ar_role e on d.ROLE_ID = e.ROLE_ID
- # where a.ACCT = 'admin'
- # '''
- # sql = f'''select a.USER_ID, a.USER_NAME, a.EMP_NO, a.PHOTO, a.PHONE, c.DEPT_ID, c.DEPT_NAME, e.ROLE_ID, e.ROLE_NAME, a.IS_DISABLED
- # from AR_USER a left JOIN AR_USER_DEPT_REL b ON a.USER_ID = b.USER_ID
- # left join AR_DEPT c on c.DEPT_ID = b.DEPT_ID
- # left join AR_USER_ROLE_REL d on a.USER_ID = d.USER_ID
- # left join AR_ROLE e on d.ROLE_ID = e.ROLE_ID
- # where a.ACCT = 'admin'
- # '''
- # out = db.engine.execute(sql).fetchone()
- # print(out)
- # --- test ---
- # result = db.get_all_field('ar', 'ar_user')
- # print(result)
- # --- test ---
- # result = db.get_one()
- # result = db.get_one(table_name='ar_user', where_key='USER_NAME', where_val='陈旭')
- # result = db.get_one(table_name='ar_user', where_key='USER_NAME', where_val='admin')
- # print(result)
- # --- test ---
- # sql = f'''select count(1) from ar.ar_phone_call where status in (0, 1)'''
- # result = db.execute_sql(sql)
- # print(result)
- # --- test ---
- # items = db.get_all('vul_detail')
- # for item in items:
- # print(item.cve_id)
- # # print(item.vul_name)
- # --- test ---
- # out = db.get_one('vul_detail', {'cve_id': 'CVE-2007-1858'})
- # print(type(out.get('cvss_point')))
- # print(out.get('cvss_point'))
|