[筆記] API VS DB操作

個人筆記,利用python call api及DB操作

import requests
from requests.auth import HTTPBasicAuth
from Class.ExecutionResult import ExecutionResult

class APIClient:
    def __init__(self, base_url, userid=None, pwd=None,proxies=None):        
        self.base_url = base_url
        self.auth = HTTPBasicAuth(userid, pwd) if userid and pwd else None
        self.proxies = proxies

    def post(self, endpoint, params=None, headers=None
             , verify=False, cls=None):
        result = ExecutionResult()
        if endpoint:
            url = f"{self.base_url}/{endpoint}"
        else:
            url = self.base_url        
        try:
            response = requests.post(url, auth=self.auth, json=params
                                     , headers=headers, verify=verify, proxies=self.proxies)
            result = self.handle_response(response, cls,result)            
        except Exception as e:
            result.set_failure(error = e, message = "Failed to post!")        
        return result        
            
    def get(self, endpoint, params=None, headers=None
            , verify=False, cls=None):
        result = ExecutionResult()
        if endpoint:
            url = f"{self.base_url}/{endpoint}"
        else:
            url = self.base_url
                    
        try:
            response = requests.get(url, auth=self.auth
                                    , headers=headers, params=params
                                    , verify=verify
                                    , proxies=self.proxies
                                    )
            result = self.handle_response(response, cls,result)            
        except Exception as e:
            result.set_failure(error = e, message = "Failed to get!")
                    
        return result
    

    def put(self, endpoint, params=None, headers=None, verify=False, cls=None):
        result = ExecutionResult()
        url = f"{self.base_url}/{endpoint}" if endpoint else self.base_url
        
        try:
            response = requests.put(url, auth=self.auth
                                    , json=params
                                    , headers=headers
                                    , verify=verify
                                    , proxies=self.proxies
                                    )
            result = self.handle_response(response, cls, result)
        except Exception as e:
            result.set_failure(error=e, message="Failed to put!")
        
        return result

    def delete(self, endpoint, params=None, headers=None, verify=False, cls=None):
        result = ExecutionResult()
        url = f"{self.base_url}/{endpoint}" if endpoint else self.base_url
        
        try:
            response = requests.delete(url, auth = self.auth
                                       , json = params
                                       , headers = headers
                                       , verify = verify
                                       , proxies = self.proxies
                                       )
            result = self.handle_response(response, cls, result)
        except Exception as e:
            result.set_failure(error=e, message="Failed to delete!")
        
        return result    

    def handle_response(self, response, cls,result):

        if response.status_code == 200:                        
            getResponse = response.json()                
            code = getResponse.get('Code')
            msg = getResponse.get('Msg')
            data = getResponse.get('Data')              
            if code==200:                      
                if cls:
                    data = self.parse_data_as_class(data,cls)                            
                result.set_success(message = "get post/get successfully!!", data = {"code":code,"msg":msg,"data":data})
            else:
                result.set_failure(error=None, message=msg)    
        else:
            # response.raise_for_status()
            result.set_failure(error=None, message="Failed to delete!")
        return result
    
    @staticmethod
    def parse_data_as_class(data, cls):
        """
        将数据解析为指定类的实例列表
        :param data: list or dict, 包含需要解析的数据
        :param cls: class, 具有 from_dict 方法的类
        :return: list, 包含指定类的实例的列表
        """
        data_list = []
        # print("data->",data)
        if isinstance(data, list):
            try:
                # 将列表中的每个字典转换为指定类的实例
                data_list = [cls.from_dict(item) for item in data]
                # print("data_list->",data_list)
                
            except (ValueError, TypeError, KeyError):
                data_list = []
        elif isinstance(data, dict):
            try:
                # 将字典转换为单个指定类的实例并放入列表中
                data_list = [cls.from_dict(data)]
            except (ValueError, TypeError, KeyError):
                data_list = []
        return data_list
                    
                    

DB操作

import pyodbc
import pandas as pd
import re
from Class.ExecutionResult import ExecutionResult

class DBUtil:
    def __init__(self, connectionString):
        self.connectionString = connectionString
        
    def getEQRConnection(self):
        result = ExecutionResult()
        try:
            conn = pyodbc.connect(self.connectionString)
            result.set_success(message = "get connection successfully!!", data = conn)
        except Exception as e:
            result.set_failure(error = e, message = "Failed to get connection!")
        return result
        
    def getConnection(self,connectionString):
        '''
        連線字串
        '''
        result = ExecutionResult()
                
        try:
            conn = pyodbc.connect(connectionString)
            result.set_success(message = "get connection successfully!!", data = conn)
        except Exception as e:
            result.set_failure(error = e, message = "Failed to get connection!")
            
        return result        

    def closeConnection(self,conn):
        '''
        關閉Connection
        '''
        try:
            conn.close()
        except Exception as e:
            print("close db error:",e)

    def execute_query(self, sql_query,conn=None):
        """
        Execute a SQL query on the database.
        """        
        
        result = ExecutionResult()
                
        try:
            # print(self.connectionString)
            if conn==None:            
                conn = pyodbc.connect(self.connectionString) 
            # print("aaaaaaaaa")           
            data = pd.read_sql_query(sql_query,conn)
            result.set_success(message = "Exec command successfully!!", data = data)
            conn.close()
        except Exception as e:
            result.set_failure(error = e, message = "Exec command failed!")

        return result
        
    def prepare_commandstring(self,query_template, params):
        """
        准备带有命名参数的 SQL 查询。
        :param query_template: 带有命名参数的查询模板
        :param params: 参数字典
        :return: (最终的查询字符串, 参数元组)
        
            SELECT * FROM your_table_name 
        WHERE column1 = {param1} AND column2 = {param2} OR column3 = {param1}
        """
        # 使用字符串格式化生成最终的查询
        query = query_template.format(**{key: '?' for key in params.keys()})

        # 使用正则表达式找到所有的占位符
        param_order = re.findall(r'\{(\w+)\}', query_template)
        # print(param_order)

        # 生成参数元组,确保每个占位符都有对应的参数值
        param_values = tuple(params[key] for key in param_order)

        # 返回最终的查询字符串和参数元组
        return query, param_values

    def bulk_insert_or_update(self, df, table_name, key_columns
                              , update_columns=None,conn=None
                              , null_check_columns=None):
        
        """
        批量插入或更新数据
        :param df: pandas DataFrame, 包含需要插入或更新的数据
        :param table_name: str, 数据库表名,包含 schema 信息
        :param key_columns: list, 用于判断是插入还是更新的关键列
        :param update_columns: list, 需要更新的列,如果为 None,则更新所有列
        """
        
        """
        範例
        result = ExecutionResult()
        db = DBUtil("DRIVER={SQL Server}; SERVER=TWTP1APM40\\SQLEXPRESS; DATABASE=HRSDB; UID=jojo; PWD=633605jojo")
        data = {
            "empNo":["Axxx","xxooxxo"]
            ,"empName":['陳小Jo',"HaHaa"]
        }
        df = pd.DataFrame(data)
        result=db.bulk_insert_or_update(df, "dbo.empTest", key_columns=["empNo"], update_columns=["empName"])
        """
        result = ExecutionResult()
        
        try:
            
            if conn==None:            
                conn = pyodbc.connect(self.connectionString)
            cursor = conn.cursor()

            # 创建临时表
            temp_table_name = f"#{table_name.replace('.', '_')}_tempforpy"

            create_temp_table_sql = f"SELECT TOP 0 * INTO {temp_table_name} FROM {table_name}"

            cursor.execute(create_temp_table_sql)
            conn.commit()
            
            
            # 将数据插入临时表
            insert_temp_sql = f"INSERT INTO {temp_table_name} ({', '.join(df.columns)}) VALUES ({', '.join(['?' for _ in df.columns])})"
            cursor.executemany(insert_temp_sql, df.values.tolist())
            conn.commit()            
            
            # 如果 update_columns 为 None,则更新所有列
            if update_columns is None:
                update_columns = [col for col in df.columns if col not in key_columns]
            
            on_conditions = [f"ISNULL(target.{col}, '') = ISNULL(source.{col}, '')" for col in key_columns]
            if null_check_columns:
                on_conditions += [f"target.{col} IS NULL" for col in null_check_columns]
            
            # 合并数据
            merge_sql = f"""
            MERGE INTO {table_name} AS target
            USING {temp_table_name} AS source
            ON {" AND ".join(on_conditions)}
            WHEN MATCHED AND {" OR ".join([f"ISNULL(target.{col},'') <> ISNULL(source.{col},'')" for col in update_columns])} THEN
                UPDATE SET {", ".join([f"target.{col} = source.{col}" for col in update_columns])}, target.MODIFY_TIME = GETDATE()
            WHEN NOT MATCHED THEN
                INSERT ({', '.join(df.columns)}, CREATE_TIME) VALUES ({', '.join([f"source.{col}" for col in df.columns])}, GETDATE());
            """            
            
            
            
            # merge_sql = f"""
            # MERGE INTO {table_name} AS target
            # USING {temp_table_name} AS source
            # ON {" AND ".join([f"ISNULL(target.{col},'') = ISNULL(source.{col},'')" for col in key_columns])}
            # WHEN MATCHED AND {" OR ".join([f"ISNULL(target.{col},'') <> ISNULL(source.{col},'')" for col in update_columns])} THEN
            #     UPDATE SET {", ".join([f"target.{col} = source.{col}" for col in update_columns])}, target.MODIFY_TIME = GETDATE()
            # WHEN NOT MATCHED THEN
            #     INSERT ({', '.join(df.columns)}, CREATE_TIME) VALUES ({', '.join([f"source.{col}" for col in df.columns])}, GETDATE());
            # """      
            
            # print(merge_sql)
                     
            cursor.execute(merge_sql)
            conn.commit()

            # 删除临时表
            drop_temp_table_sql = f"DROP TABLE {temp_table_name}"
            cursor.execute(drop_temp_table_sql)
            conn.commit()

            cursor.close()
            conn.close()
            result.set_success(message = "bulk insert / update successfully !!!", data = None)            
        except Exception as e:
            result.set_failure(error = e, message = "bulk insert / update failed !!!")            
        
        return result
    
    def bulk_insert(self, df, table_name,conn=None):
        """
        批量插入数据
        :param df: pandas DataFrame, 包含需要插入的数据
        :param table_name: str, 数据库表名
        """
        
        """
        範例
        result = ExecutionResult()
        db = DBUtil("DRIVER={SQL Server}; SERVER=TWTP1APM40\\SQLEXPRESS; DATABASE=HRSDB; UID=jojo; PWD=633605jojo")
        data = {
            "empNo":["AB404","AB305"]
            ,"empName":['王大明',"陳小胖"]
        }
        df = pd.DataFrame(data)
        result=db.bulk_insert(df, "dbo.empTest")
        """
        
        result = ExecutionResult()
        try:
            if conn ==  None:
                conn = pyodbc.connect(self.connectionString)
                        
            cursor = conn.cursor()

            # 构建插入 SQL 语句
            columns = ', '.join(df.columns) + ', updatetime'
            placeholders = ', '.join(['?' for _ in df.columns]) + ', GETDATE()'
            insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

            # 批量插入数据
            cursor.executemany(insert_sql, df.values.tolist())
            conn.commit()
            cursor.close()
            conn.close()
            result.set_success(message = "bulk insert successfully !!!", data = None)            
        except Exception as e:            
            result.set_failure(error = e, message = "bulk insert failed !!!")            
        return result
    
    def query_to_dataframe(self, table_name, columns, conditions = None,conn = None):
        if conn ==  None:
            conn = pyodbc.connect(self.connectionString)
                    
        cursor = conn.cursor()
        
        # 构建查询语句
        query = f"SELECT {', '.join(columns)} FROM {table_name}"
        if conditions:
            query += f" WHERE {' AND '.join(conditions)}"                
        
        # 执行查询并获取结果
        df = pd.read_sql_query(query, conn)
        
        conn.close()
        
        return df                

打雜打久了,就變成打雜妹

程式寫久了,就變成老乞丐