個人筆記,利用python call api及DB操作
import requests
from requests.auth import HTTPBasicAuth
from Class.ExecutionResult import ExecutionResult
class APIClient:
def __init__(self, base_url, userid=None, pwd=None,proxies=None):
self.base_url = base_url
self.auth = HTTPBasicAuth(userid, pwd) if userid and pwd else None
self.proxies = proxies
def post(self, endpoint, params=None, headers=None
, verify=False, cls=None):
result = ExecutionResult()
if endpoint:
url = f"{self.base_url}/{endpoint}"
else:
url = self.base_url
try:
response = requests.post(url, auth=self.auth, json=params
, headers=headers, verify=verify, proxies=self.proxies)
result = self.handle_response(response, cls,result)
except Exception as e:
result.set_failure(error = e, message = "Failed to post!")
return result
def get(self, endpoint, params=None, headers=None
, verify=False, cls=None):
result = ExecutionResult()
if endpoint:
url = f"{self.base_url}/{endpoint}"
else:
url = self.base_url
try:
response = requests.get(url, auth=self.auth
, headers=headers, params=params
, verify=verify
, proxies=self.proxies
)
result = self.handle_response(response, cls,result)
except Exception as e:
result.set_failure(error = e, message = "Failed to get!")
return result
def put(self, endpoint, params=None, headers=None, verify=False, cls=None):
result = ExecutionResult()
url = f"{self.base_url}/{endpoint}" if endpoint else self.base_url
try:
response = requests.put(url, auth=self.auth
, json=params
, headers=headers
, verify=verify
, proxies=self.proxies
)
result = self.handle_response(response, cls, result)
except Exception as e:
result.set_failure(error=e, message="Failed to put!")
return result
def delete(self, endpoint, params=None, headers=None, verify=False, cls=None):
result = ExecutionResult()
url = f"{self.base_url}/{endpoint}" if endpoint else self.base_url
try:
response = requests.delete(url, auth = self.auth
, json = params
, headers = headers
, verify = verify
, proxies = self.proxies
)
result = self.handle_response(response, cls, result)
except Exception as e:
result.set_failure(error=e, message="Failed to delete!")
return result
def handle_response(self, response, cls,result):
if response.status_code == 200:
getResponse = response.json()
code = getResponse.get('Code')
msg = getResponse.get('Msg')
data = getResponse.get('Data')
if code==200:
if cls:
data = self.parse_data_as_class(data,cls)
result.set_success(message = "get post/get successfully!!", data = {"code":code,"msg":msg,"data":data})
else:
result.set_failure(error=None, message=msg)
else:
# response.raise_for_status()
result.set_failure(error=None, message="Failed to delete!")
return result
@staticmethod
def parse_data_as_class(data, cls):
"""
将数据解析为指定类的实例列表
:param data: list or dict, 包含需要解析的数据
:param cls: class, 具有 from_dict 方法的类
:return: list, 包含指定类的实例的列表
"""
data_list = []
# print("data->",data)
if isinstance(data, list):
try:
# 将列表中的每个字典转换为指定类的实例
data_list = [cls.from_dict(item) for item in data]
# print("data_list->",data_list)
except (ValueError, TypeError, KeyError):
data_list = []
elif isinstance(data, dict):
try:
# 将字典转换为单个指定类的实例并放入列表中
data_list = [cls.from_dict(data)]
except (ValueError, TypeError, KeyError):
data_list = []
return data_list
DB操作
import pyodbc
import pandas as pd
import re
from Class.ExecutionResult import ExecutionResult
class DBUtil:
def __init__(self, connectionString):
self.connectionString = connectionString
def getEQRConnection(self):
result = ExecutionResult()
try:
conn = pyodbc.connect(self.connectionString)
result.set_success(message = "get connection successfully!!", data = conn)
except Exception as e:
result.set_failure(error = e, message = "Failed to get connection!")
return result
def getConnection(self,connectionString):
'''
連線字串
'''
result = ExecutionResult()
try:
conn = pyodbc.connect(connectionString)
result.set_success(message = "get connection successfully!!", data = conn)
except Exception as e:
result.set_failure(error = e, message = "Failed to get connection!")
return result
def closeConnection(self,conn):
'''
關閉Connection
'''
try:
conn.close()
except Exception as e:
print("close db error:",e)
def execute_query(self, sql_query,conn=None):
"""
Execute a SQL query on the database.
"""
result = ExecutionResult()
try:
# print(self.connectionString)
if conn==None:
conn = pyodbc.connect(self.connectionString)
# print("aaaaaaaaa")
data = pd.read_sql_query(sql_query,conn)
result.set_success(message = "Exec command successfully!!", data = data)
conn.close()
except Exception as e:
result.set_failure(error = e, message = "Exec command failed!")
return result
def prepare_commandstring(self,query_template, params):
"""
准备带有命名参数的 SQL 查询。
:param query_template: 带有命名参数的查询模板
:param params: 参数字典
:return: (最终的查询字符串, 参数元组)
SELECT * FROM your_table_name
WHERE column1 = {param1} AND column2 = {param2} OR column3 = {param1}
"""
# 使用字符串格式化生成最终的查询
query = query_template.format(**{key: '?' for key in params.keys()})
# 使用正则表达式找到所有的占位符
param_order = re.findall(r'\{(\w+)\}', query_template)
# print(param_order)
# 生成参数元组,确保每个占位符都有对应的参数值
param_values = tuple(params[key] for key in param_order)
# 返回最终的查询字符串和参数元组
return query, param_values
def bulk_insert_or_update(self, df, table_name, key_columns
, update_columns=None,conn=None
, null_check_columns=None):
"""
批量插入或更新数据
:param df: pandas DataFrame, 包含需要插入或更新的数据
:param table_name: str, 数据库表名,包含 schema 信息
:param key_columns: list, 用于判断是插入还是更新的关键列
:param update_columns: list, 需要更新的列,如果为 None,则更新所有列
"""
"""
範例
result = ExecutionResult()
db = DBUtil("DRIVER={SQL Server}; SERVER=TWTP1APM40\\SQLEXPRESS; DATABASE=HRSDB; UID=jojo; PWD=633605jojo")
data = {
"empNo":["Axxx","xxooxxo"]
,"empName":['陳小Jo',"HaHaa"]
}
df = pd.DataFrame(data)
result=db.bulk_insert_or_update(df, "dbo.empTest", key_columns=["empNo"], update_columns=["empName"])
"""
result = ExecutionResult()
try:
if conn==None:
conn = pyodbc.connect(self.connectionString)
cursor = conn.cursor()
# 创建临时表
temp_table_name = f"#{table_name.replace('.', '_')}_tempforpy"
create_temp_table_sql = f"SELECT TOP 0 * INTO {temp_table_name} FROM {table_name}"
cursor.execute(create_temp_table_sql)
conn.commit()
# 将数据插入临时表
insert_temp_sql = f"INSERT INTO {temp_table_name} ({', '.join(df.columns)}) VALUES ({', '.join(['?' for _ in df.columns])})"
cursor.executemany(insert_temp_sql, df.values.tolist())
conn.commit()
# 如果 update_columns 为 None,则更新所有列
if update_columns is None:
update_columns = [col for col in df.columns if col not in key_columns]
on_conditions = [f"ISNULL(target.{col}, '') = ISNULL(source.{col}, '')" for col in key_columns]
if null_check_columns:
on_conditions += [f"target.{col} IS NULL" for col in null_check_columns]
# 合并数据
merge_sql = f"""
MERGE INTO {table_name} AS target
USING {temp_table_name} AS source
ON {" AND ".join(on_conditions)}
WHEN MATCHED AND {" OR ".join([f"ISNULL(target.{col},'') <> ISNULL(source.{col},'')" for col in update_columns])} THEN
UPDATE SET {", ".join([f"target.{col} = source.{col}" for col in update_columns])}, target.MODIFY_TIME = GETDATE()
WHEN NOT MATCHED THEN
INSERT ({', '.join(df.columns)}, CREATE_TIME) VALUES ({', '.join([f"source.{col}" for col in df.columns])}, GETDATE());
"""
# merge_sql = f"""
# MERGE INTO {table_name} AS target
# USING {temp_table_name} AS source
# ON {" AND ".join([f"ISNULL(target.{col},'') = ISNULL(source.{col},'')" for col in key_columns])}
# WHEN MATCHED AND {" OR ".join([f"ISNULL(target.{col},'') <> ISNULL(source.{col},'')" for col in update_columns])} THEN
# UPDATE SET {", ".join([f"target.{col} = source.{col}" for col in update_columns])}, target.MODIFY_TIME = GETDATE()
# WHEN NOT MATCHED THEN
# INSERT ({', '.join(df.columns)}, CREATE_TIME) VALUES ({', '.join([f"source.{col}" for col in df.columns])}, GETDATE());
# """
# print(merge_sql)
cursor.execute(merge_sql)
conn.commit()
# 删除临时表
drop_temp_table_sql = f"DROP TABLE {temp_table_name}"
cursor.execute(drop_temp_table_sql)
conn.commit()
cursor.close()
conn.close()
result.set_success(message = "bulk insert / update successfully !!!", data = None)
except Exception as e:
result.set_failure(error = e, message = "bulk insert / update failed !!!")
return result
def bulk_insert(self, df, table_name,conn=None):
"""
批量插入数据
:param df: pandas DataFrame, 包含需要插入的数据
:param table_name: str, 数据库表名
"""
"""
範例
result = ExecutionResult()
db = DBUtil("DRIVER={SQL Server}; SERVER=TWTP1APM40\\SQLEXPRESS; DATABASE=HRSDB; UID=jojo; PWD=633605jojo")
data = {
"empNo":["AB404","AB305"]
,"empName":['王大明',"陳小胖"]
}
df = pd.DataFrame(data)
result=db.bulk_insert(df, "dbo.empTest")
"""
result = ExecutionResult()
try:
if conn == None:
conn = pyodbc.connect(self.connectionString)
cursor = conn.cursor()
# 构建插入 SQL 语句
columns = ', '.join(df.columns) + ', updatetime'
placeholders = ', '.join(['?' for _ in df.columns]) + ', GETDATE()'
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 批量插入数据
cursor.executemany(insert_sql, df.values.tolist())
conn.commit()
cursor.close()
conn.close()
result.set_success(message = "bulk insert successfully !!!", data = None)
except Exception as e:
result.set_failure(error = e, message = "bulk insert failed !!!")
return result
def query_to_dataframe(self, table_name, columns, conditions = None,conn = None):
if conn == None:
conn = pyodbc.connect(self.connectionString)
cursor = conn.cursor()
# 构建查询语句
query = f"SELECT {', '.join(columns)} FROM {table_name}"
if conditions:
query += f" WHERE {' AND '.join(conditions)}"
# 执行查询并获取结果
df = pd.read_sql_query(query, conn)
conn.close()
return df
打雜打久了,就變成打雜妹
程式寫久了,就變成老乞丐