# update: 2023-11-12-19 """ pip install pymysql==0.9.3 -i https://pypi.tuna.tsinghua.edu.cn/simple pip install SQLAlchemy==1.4.30 -i https://pypi.tuna.tsinghua.edu.cn/simple 问题: CryptographyDeprecationWarning: Python 3.6 is no longer supported by the Python core team. Therefore, support for it is deprecated in cryptography. The next release of cryptography will remove support for Python 3.6. from cryptography.hazmat.backends import default_backen 解决: pip3 install cryptography==3.4.8 see: https://blog.csdn.net/weixin_46281427/article/details/122916870 # --- 解决root密码中包含@的问题 self.mysql_dwd_config = { 'drivername': 'mysql+pymysql', 'username': 'user_a', 'password': 'xxx@#$xxx', 'host': 'am-xxxxx.ads.aliyuncs.com', 'port': 3306, } if sqlalchemy.__version__ >= '1.4': #其实大于1.4.15之后,密码里面含有@,就必须以这种方式创建正确正则识别密码的引擎了。 self.mysql_engine_url = sqlalchemy.engine.URL.create(**self.mysql_dwd_config) self.mysql_engine_url = self.mysql_engine_url.update_query_dict({'charset': 'utf8mb4'}) else: # password 含有@ self.mysql_engine_url = '{drivername}://{username}:{password}@{host}:{port}/?charset=utf8mb4'.format(**self.mysql_dwd_config) self.mysql_dwd_engine = sqlalchemy.create_engine(self.mysql_engine_url) """ from sqlalchemy import create_engine, inspect, MetaData, Table from sqlalchemy.orm import sessionmaker class Client(): def __init__(self, host='127.0.0.1', port=3306, database='ar', username='root', password='20221212!'): """ """ # 设置字符集 args = 'charset=utf8mb4' # 设置关闭 only_full_group_by 模式 args = '&sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION' self.engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?{args}") # self.engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}") self.database = database self.inspect = inspect(self.engine) def get_all_database(self): """获取全部数据库名称""" return self.inspect.get_schema_names() def get_all_table(self, database_name): """获取全部数据表名称""" return self.inspect.get_table_names(schema=database_name) def get_all_field(self, table_name): """获取全部字段名称""" fields = self.inspect.get_columns(table_name, schema=self.database) # 表名,库名 return [i.get('name') for i in fields] def execute_sql(self, sql): """ 执行sql """ try: result = self.engine.execute(sql).fetchone()[0] # print(result.__class__.__name__) return result except Exception as exception: print(exception.__class__.__name__) return None def get_all(self, table_name): """ 获取全部数据 """ sql = f"SELECT * FROM {table_name};" try: result = self.engine.execute(sql).fetchall() # 返回的是列表 # print(result.__class__.__name__) return result except Exception as exception: print(exception.__class__.__name__) return None def get_one(self, table_name='ar_user', where_key='ACCT', where_val='admin'): """ 单条获取 """ sql = f"SELECT * FROM {table_name} WHERE {where_key} = '{where_val}';" try: result = self.engine.execute(sql).fetchone() # 返回的是元祖 # result = self.engine.execute(sql).fetchall() # 返回的是列表 # print(result.__class__.__name__) return result except Exception as exception: print(exception.__class__.__name__) return None if __name__ == '__main__': # --- init --- # db = Client(host='127.0.0.1', port=3306, database='ar', username='root', password='20221212!') db = Client(host='127.0.0.1', port=3306, database='ar', username='root', password='rootroot&123123') # db = Client(host='58.34.94.176', port=8806, database='ar', username='root', password='rootroot&123123') # --- test --- # result = db.get_all_field('ar_user') # result = db.get_all('ar_phone_call') result = db.get_one('ar_user', 'USER_ID', '10000LXQ') print(result) print(result[1]) # --- test --- # sql = f'''select a.USER_ID, a.USER_NAME, a.EMP_NO, a.PHOTO, a.PHONE, c.DEPT_ID, c.DEPT_NAME, e.ROLE_ID, e.ROLE_NAME, a.IS_DISABLED # from ar_user a left JOIN ar_user_dept_rel b ON a.USER_ID = b.USER_ID # left join ar_dept c on c.DEPT_ID = b.DEPT_ID # left join ar_user_role_rel d on a.USER_ID = d.USER_ID # left join ar_role e on d.ROLE_ID = e.ROLE_ID # where a.ACCT = 'admin' # ''' # sql = f'''select a.USER_ID, a.USER_NAME, a.EMP_NO, a.PHOTO, a.PHONE, c.DEPT_ID, c.DEPT_NAME, e.ROLE_ID, e.ROLE_NAME, a.IS_DISABLED # from AR_USER a left JOIN AR_USER_DEPT_REL b ON a.USER_ID = b.USER_ID # left join AR_DEPT c on c.DEPT_ID = b.DEPT_ID # left join AR_USER_ROLE_REL d on a.USER_ID = d.USER_ID # left join AR_ROLE e on d.ROLE_ID = e.ROLE_ID # where a.ACCT = 'admin' # ''' # out = db.engine.execute(sql).fetchone() # print(out) # --- test --- # result = db.get_all_field('ar', 'ar_user') # print(result) # --- test --- # result = db.get_one() # result = db.get_one(table_name='ar_user', where_key='USER_NAME', where_val='陈旭') # result = db.get_one(table_name='ar_user', where_key='USER_NAME', where_val='admin') # print(result) # --- test --- # sql = f'''select count(1) from ar.ar_phone_call where status in (0, 1)''' # result = db.execute_sql(sql) # print(result) # --- test --- # items = db.get_all('vul_detail') # for item in items: # print(item.cve_id) # # print(item.vul_name) # --- test --- # out = db.get_one('vul_detail', {'cve_id': 'CVE-2007-1858'}) # print(type(out.get('cvss_point'))) # print(out.get('cvss_point'))