接口自动化脚本v3与多用户v3.1

hkt 发布于 2025-08-15 22 次阅读


v2版本解决了数据库的连接,以及在进行测试时的查询和删除的需求,但是还未增加日志和通过装日志饰器简化try捕获异常的代码量。还需要解决多用户使用的问题,因此优化代码内容实现相应的功能。

整体思路:首先v3添加日志功能,将日志文件存放至report文件中的log文件夹。那么在哪里添加这行代码?这里就需要先理解__init__.py文件,在Python中,__init__.py 文件是包(package)的核心组成部分。使Python将包含该文件的目录视为可导入的包没有__init__.py的目录会被视为普通目录,在运行包时会首先运行__init__.py文件进行初始化,非常适合编写日志装饰器的代码。

v3版本

在APIAutotest文件中的__init__.py文件中添加日志功能主要使用logging库来实现日志的功能

__init__文件

import functools
import logging
import os
from datetime import datetime


class Log:
    __instance = None

    @classmethod
    def __log(cls):
        # 创建Logger对象
        logger = logging.getLogger()
        # 设置日志文件中,日志写入的级别
        logger.level = logging.NOTSET
        # 动态获取日志文件的路径
        log_dir = os.path.join(os.path.join(os.path.dirname(__file__), "report"), "log")
        log_name = "APIAutoTest_" + datetime.strftime(datetime.now(), "%Y_%m_%d_%H_%M_%S") + ".log"
        log_path = os.path.join(log_dir, log_name)
        # 创建日志文件的handler
        handler = logging.FileHandler(log_path, mode="a", encoding="utf-8")
        # 设置日志在文件中的显示格式。
        formater = logging.Formatter('%(levelname)s - %(asctime)s - : %(message)s')
        # 给日志文件的handler添加formatter
        handler.setFormatter(formater)
        # 给Logger对象添加日志文件的handler
        logger.addHandler(handler)
        return logger

    def __new__(cls, *args, **kwargs):
        if cls.__instance is None:
            # 获取logger对象
            cls.logger = cls.__log()
            cls.__instance = super().__new__(cls, *args, **kwargs)
        return cls.__instance

    def info(self, msg):
        self.logger.info(msg)

    def debug(self, msg):
        self.logger.debug(msg)

    def warning(self, msg):
        self.logger.warning(msg)

    def error(self, msg):
        self.logger.error(msg)

    def critical(self, msg):
        self.logger.critical(msg)

    def fatal(self, msg):
        self.logger.fatal(msg)


def log(param):
    logger = Log()
    @functools.wraps(param) # 让被装饰对象在接收装饰器的返回值时,使用原来的名称
    def inner(*args, **kwargs):
        # log.info("info级别的日志")
        # __code__: 代码对象
        # co_filename: 获取代码所在的文件,绝对路径
        # co_firstlineno: 获取代码的首行
        logger.info(f"执行的功能为:{param.__name__}, 其功能描述为:{param.__doc__}, 传入的参数为:{args if args else kwargs}, 所在的文件为:{param.__code__.co_filename},所在的行为:{param.__code__.co_firstlineno}")
        try:
            result = param(*args, **kwargs)
        except Exception as e:
            logger.error(f"执行的功能为:{param.__name__}, 其功能描述为:{param.__doc__}, 传入的参数为:{args if args else kwargs}, 执行时报错,错误类型为:{type(e)}, 错误的描述为:{e},所在的文件为:{param.__code__.co_filename},所在的行为:{param.__code__.co_firstlineno}")
            raise e
        else:
            return result
    return inner

修改完成后所有try的语句改为使用@log来装饰

例如

import openpyxl

from APIAutoTest_v3 import log
from APIAutoTest_v3.common.read_ini import ReadIni
from APIAutoTest_v3.common.read_json import read_json


class ReadExcel:
    @log
    def __init__(self):
        """获取所有json文件的路径,并读取,再获取excel文件路径和工作表的名称,加载工作簿获取工作表"""
        self.ini = ReadIni()
        case_data_path = self.ini.get_file_path("case")
        expect_data_path = self.ini.get_file_path("expect")
        sql_data_path = self.ini.get_file_path("sql")
        self.case_data_dict = read_json(case_data_path)
        self.expect_data_dict = read_json(expect_data_path)
        self.sql_data_dict = read_json(sql_data_path)

        excel_path = self.ini.get_file_path("excel")
        table_name = self.ini.get_table_name("name")
        wb = openpyxl.load_workbook(excel_path)
        self.ws = wb[table_name]

    @log
    def __get_cell_value(self, column, row):
        """获取指定单元格数据"""
        value = self.ws[column + str(row)].value
        if isinstance(value, str) and len(value.strip()) > 0:
            return value.strip()

    @log
    def module_name(self, row):
        """根据行,获取模块名称"""
        return self.__get_cell_value("b", row)

    @log
    def api_name(self, row):
        """根据行,获取接口名称"""
        return self.__get_cell_value("c", row)

    @log
    def case_req(self, row):
        """根据行,获取用例的请求方法"""
        return self.__get_cell_value("f", row)

    @log
    def case_url(self, row):
        """根据行,获取用例的url"""
        value = self.__get_cell_value("h", row)
        if value:
            return self.ini.get_host("host") + value

    @log
    def case_mime(self, row):
        """根据行,获取用例的媒体类型"""
        value = self.__get_cell_value("g", row)
        if value:
            return value.lower()

    @log
    def case_data(self, row):
        """根据行,获取用例数据"""
        case_data_key = self.__get_cell_value("i", row)
        module = self.module_name(row)
        api = self.api_name(row)
        if case_data_key and module and api:
            return self.case_data_dict[module][api][case_data_key]

    @log
    def expect_data(self, row):
        """根据行,获取期望数据"""
        expect_data_key = self.__get_cell_value("j", row)
        module = self.module_name(row)
        api = self.api_name(row)
        if expect_data_key and module and api:
            return self.expect_data_dict[module][api][expect_data_key]

    @log
    def sql_data(self, row):
        """根据行,获取sql语句"""
        sql_data_key = self.__get_cell_value("k", row)
        module = self.module_name(row)
        api = self.api_name(row)
        if sql_data_key and module and api:
            return self.sql_data_dict[module][api][sql_data_key]

    @log
    def sql_type(self, row):
        """根据行,获取sql语句的类型"""
        value = self.__get_cell_value("l", row)
        if value:
            return value.lower()

    @log
    def update_key(self, row):
        """根据行,获取更新的key"""
        return self.__get_cell_value("m", row)

    @log
    def get_data(self):
        """获取测试数据,数据存放在一个二维列表中"""
        list_data = []
        for row in range(2, self.ws.max_row+1):
            req = self.case_req(row)
            url = self.case_url(row)
            mime = self.case_mime(row)
            case = self.case_data(row)
            expect = self.expect_data(row)
            sql = self.sql_data(row)
            sql_type = self.sql_type(row)
            update_key = self.update_key(row)
            if req and url and expect:
                list_data.append([req, url, mime, case, expect, sql, sql_type, update_key])
        else:
            return list_data

v3.1

整理思路:在v3.1中可以加入多用户的功能和allure功能(可选),将原来的配置文件改为系统的配置文件并分出部分数据内容只保留host节点和mysql节点,在此基础上增加user节点下用户的目录名称,其余的用户有单独的配置文件保留file节点和table节点。然后对data_config文件进行分组,一个用户一个目录保存sql语句、用户配置、期望数据、用例数据、用例管理文件。然后修改部分读取的代码。

更新的内容有:

  1. 数据配置层中将分为一个用户一个目录,所有的数据文件放在用户目录中,系统配置文件中保留数据库和域名的节点,新增用户目录名成的节点。每个用户目录下有自己的配置文件保存数据文件的名称和工作表名。
  2. 公共层中减少原本配置文件的读取,增加用户配置文件的读取。
  3. 用例层分为一个用户一个用例文件目录,存放用户个人的请求测试文件。
  4. 在用例层中可增加@allure辅助生成可视化的报告

数据配置层

以这样的形式独立出用户的配置文件、数据文件、配置文件。

公共层

read_system_ini.py

增加了get_user_name获取用户存放数据的目录名称

import configparser
import os
from APIAutoTest_v3_1 import log

class ReadSystemIni:
    __instance = None

    @log
    def __new__(cls, *args, **kwargs):
        """获取ini文件路径,并读取"""
        if cls.__instance is None:
            cls.data_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data_config")
            ini_path = os.path.join(cls.data_config_path, "system_config.ini")  # 更改系统配置文件的名称
            cls.conf = configparser.ConfigParser()
            cls.conf.read(ini_path, encoding="utf-8")
            cls.__instance = super().__new__(cls, *args, **kwargs)
        return cls.__instance

    @log
    def get_host(self, key):
        """根据key,获取被测系统的域名"""
        return self.conf.get("host", key)

    @log
    def get_mysql_connect_msg(self, key):
        """根据key,获取获取数据库的链接信息"""
        return self.conf.get("mysql", key)

    @log
    def get_user_name(self, key):
        """根据key,获取用户存放数据的目录名称"""
        return self.conf.get("user", key)

read_user_ini.py

增加用户文件路径读取以及工作表名称读取。

import configparser
import os
from APIAutoTest_v3_1 import log
from APIAutoTest_v3_1.common.read_system_ini import ReadSystemIni

class ReadUserIni:

    @log
    def __init__(self, user_name):
        """获取用户ini的配置文件路径,并读取"""
        # 创建读取系统配置文件类的对象
        system_ini = ReadSystemIni()
        # 调用get_user_name获取用户存放数据的目录的名称
        user_data_name = system_ini.get_user_name(user_name)
        # 用户存放数据的目录的路径拼接
        self.user_data_dir = os.path.join(system_ini.data_config_path, user_data_name)
        # 拼接用户的配置文件路径
        user_ini_path = os.path.join(self.user_data_dir, "user_config.ini")
        # 创建Configparser对象
        self.conf = configparser.ConfigParser()
        # 读取ini文件
        self.conf.read(user_ini_path, encoding="utf-8")

    @log
    def get_file_path(self, key):
        """根据key,获取file节点下文件的路径"""
        return os.path.join(self.user_data_dir, self.conf.get("file", key))

    @log
    def get_table_name(self, key):
        """根据key,获取excel中工作表的名称"""
        return self.conf.get("table", key)

read_json.py

import json
import os
from APIAutoTest_v3_1 import log


@log
def read_json(file_path):
    """读取json文件,将json文件的内容序列化为python对象,再返回"""
    if isinstance(file_path, str) and file_path.lower().endswith(".json") and os.path.isfile(file_path):
        with open(file_path, mode="r", encoding="utf-8") as f:
            return json.load(f)
    else:
        raise FileNotFoundError("文件路径错误")


if __name__ == '__main__':
    read_json("xxx.json")

db.py

import pymysql
from dbutils.pooled_db import PooledDB

from APIAutoTest_v3_1 import log
from APIAutoTest_v3_1.common.read_system_ini import ReadSystemIni


class DB:
    __instance = None
    ini = ReadSystemIni()

    @log
    def __new__(cls, *args, **kwargs):
        """创建连接池"""
        if cls.__instance is None:
            cls.pool = PooledDB(
                creator=pymysql,
                host=cls.ini.get_mysql_connect_msg("host"),
                port=int(cls.ini.get_mysql_connect_msg("port")),
                user=cls.ini.get_mysql_connect_msg("user"),
                password=cls.ini.get_mysql_connect_msg("password"),
                database=cls.ini.get_mysql_connect_msg("database"),
                maxconnections=10,
                mincached=5,
                blocking=True
            )
            cls.__instance = super().__new__(cls, *args, **kwargs)
        return cls.__instance

    @log
    def __init__(self):
        """获取链接对象"""
        self.conn = self.pool.connection()

    @log
    def delete(self, sql_sentence):
        """执行删除的sql语句"""
        if isinstance(sql_sentence, str) and sql_sentence.lower().startswith("delete"):
            with self.conn.cursor() as cursor:
                cursor.execute(sql_sentence)
            self.conn.commit()
        else:
            raise ValueError("删除的sql语句错误")

    @log
    def select(self, sql_sentence):
        """执行查询的sql语句"""
        if isinstance(sql_sentence, str) and sql_sentence.lower().startswith("select"):
            with self.conn.cursor() as cursor:
                cursor.execute(sql_sentence)
                select_result = cursor.fetchone()
                if select_result:
                    return select_result[0]
        else:
            raise ValueError("删除的sql语句错误")

    @log
    def close(self):
        """关闭链接对象"""
        self.conn.close()

read_excel

import openpyxl
from APIAutoTest_v3_1 import log
from APIAutoTest_v3_1.common.read_json import read_json
from APIAutoTest_v3_1.common.read_system_ini import ReadSystemIni
from APIAutoTest_v3_1.common.read_user_ini import ReadUserIni


class ReadExcel:
    @log
    def __init__(self, user_name):
        """获取所有json文件的路径,并读取,再获取excel文件路径和工作表的名称,加载工作簿获取工作表"""
        self.ini = ReadUserIni(user_name)  # 创建ReadUserIni对象
        case_data_path = self.ini.get_file_path("case")
        expect_data_path = self.ini.get_file_path("expect")
        sql_data_path = self.ini.get_file_path("sql")
        self.case_data_dict = read_json(case_data_path)
        self.expect_data_dict = read_json(expect_data_path)
        self.sql_data_dict = read_json(sql_data_path)

        excel_path = self.ini.get_file_path("excel")
        table_name = self.ini.get_table_name("name")
        wb = openpyxl.load_workbook(excel_path)
        self.ws = wb[table_name]

    @log
    def __get_cell_value(self, column, row):
        """获取指定单元格数据"""
        value = self.ws[column + str(row)].value
        if isinstance(value, str) and len(value.strip()) > 0:
            return value.strip()

    @log
    def module_name(self, row):
        """根据行,获取模块名称"""
        return self.__get_cell_value("b", row)

    @log
    def api_name(self, row):
        """根据行,获取接口名称"""
        return self.__get_cell_value("c", row)

    @log
    def case_title(self, row):
        """根据行,获取用例标题"""
        return self.__get_cell_value("d", row)

    @log
    def case_level(self, row):
        """根据行,获取用例的等级"""
        return self.__get_cell_value("e", row)

    @log
    def case_req(self, row):
        """根据行,获取用例的请求方法"""
        return self.__get_cell_value("f", row)

    @log
    def case_url(self, row):
        """根据行,获取用例的url"""
        value = self.__get_cell_value("h", row)
        if value:
            return ReadSystemIni().get_host("host") + value

    @log
    def case_mime(self, row):
        """根据行,获取用例的媒体类型"""
        value = self.__get_cell_value("g", row)
        if value:
            return value.lower()

    @log
    def case_data(self, row):
        """根据行,获取用例数据"""
        case_data_key = self.__get_cell_value("i", row)
        module = self.module_name(row)
        api = self.api_name(row)
        if case_data_key and module and api:
            return self.case_data_dict[module][api][case_data_key]

    @log
    def expect_data(self, row):
        """根据行,获取期望数据"""
        expect_data_key = self.__get_cell_value("j", row)
        module = self.module_name(row)
        api = self.api_name(row)
        if expect_data_key and module and api:
            return self.expect_data_dict[module][api][expect_data_key]

    @log
    def sql_data(self, row):
        """根据行,获取sql语句"""
        sql_data_key = self.__get_cell_value("k", row)
        module = self.module_name(row)
        api = self.api_name(row)
        if sql_data_key and module and api:
            return self.sql_data_dict[module][api][sql_data_key]

    @log
    def sql_type(self, row):
        """根据行,获取sql语句的类型"""
        value = self.__get_cell_value("l", row)
        if value:
            return value.lower()

    @log
    def update_key(self, row):
        """根据行,获取更新的key"""
        return self.__get_cell_value("m", row)

    @log
    def get_data(self):
        """获取测试数据,数据存放在一个二维列表中"""
        list_data = []
        for row in range(2, self.ws.max_row+1):
            module = self.module_name(row)
            api = self.api_name(row)
            title = self.case_title(row)
            level = self.case_level(row)
            req = self.case_req(row)
            url = self.case_url(row)
            mime = self.case_mime(row)
            case = self.case_data(row)
            expect = self.expect_data(row)
            sql = self.sql_data(row)
            sql_type = self.sql_type(row)
            update_key = self.update_key(row)
            if req and url and expect:
                list_data.append([module, api, title, level, req, url, mime, case, expect, sql, sql_type, update_key])
        else:
            return list_data


if __name__ == '__main__':
    excel = ReadExcel("user2")
    print(excel.get_data())

请求方法层

request_method

import requests

from APIAutoTest_v3_1 import log
from APIAutoTest_v3_1.common.read_system_ini import ReadSystemIni


class RequestMethod:
    mime_json = {"content-type": "application/json"}
    mime_form = {"content-type": "application/x-www-form-urlencoded"}
    mime_text = {"content-type": "text/plain"}

    @log
    def __init__(self):
        """关联token"""
        self.bpm_session = requests.sessions.Session()
        login_url = ReadSystemIni().get_host("host") + "/auth"
        login_data = {"username":"admin","password":"aAoEx5BrShcwKuQJ+imnnoL5zlzp8l7aqFr0X7nXdaLKVNRuDgQlMneksilczRbbYB1jBadtShU5EpKjgy85AMXHibECDHqSCJ5B5tmiqevKzTsWI2w/vVHph+yQzYREb2znhKaL5b2Sh3JcqNXFw5DFzNixLps1OsKPCsRp4Ys="}
        self.bpm_session.headers["Authorization"] = f"Bearer {self.bpm_session.post(login_url, json=login_data).json().get('token')}"

    @log
    def request(self, req_method, req_url, req_mime, case_data):
        """根据不同的媒体类型和不同用例数据,使用不同的关键字传参"""
        if req_mime == "application/json" or req_mime == "json":
            if isinstance(case_data, str):
                return self.bpm_session.request(method=req_method, url=req_url, data=case_data, headers=self.mime_json)
            else:
                return self.bpm_session.request(method=req_method, url=req_url, json=case_data)

        elif req_mime == "application/x-www-form-urlencoded" or req_mime == "form":
            return self.bpm_session.request(method=req_method, url=req_url, data=case_data, headers=self.mime_form)

        elif req_mime is None:
            if case_data:
                return self.bpm_session.request(method=req_method, url=req_url, params=case_data)
            else:
                return self.bpm_session.request(method=req_method, url=req_url)

        elif req_mime == "query|json" or req_mime == "json|query":
            # 分别取出地址栏和请求体传参的数据
            query = case_data["query"]
            body = case_data["body"]
            # 判断请求体传参的数据类型,如果为字符串使用data传参,如果不是字符串使用json传参
            if isinstance(body, str):
                return self.bpm_session.request(method=req_method, url=req_url, params=query, data=body, headers=self.mime_json)
            else:
                return self.bpm_session.request(method=req_method, url=req_url, params=query, json=body)

        # 判断媒体类型是否为text/plain
        elif req_mime == 'text/plain' or req_mime == "text":
            return self.bpm_session.request(method=req_method, url=req_url, data=case_data, headers=self.mime_text)

        else:
            raise ValueError("请求的媒体类型错误")

用例层

整理思路:用例层的文件也需要进行分层,一个用户一个文件目录,不同的用户调用的方式可以不同。

test_xxx.py

import pytest
import allure
from APIAutoTest_v3_1 import log
from APIAutoTest_v3_1.common.read_excel import ReadExcel

datas = ReadExcel("user1").get_data()

class TestBPM:

    @log
    @allure.epic("BPM-OA系统-demo")
    # @allure.feature("模块名称")
    # @allure.story("接口名称")
    # @allure.title("用例标题")
    # @allure.severity("用例等级")
    @pytest.mark.parametrize("module, api, title, level, req, url, mime, case, expect, sql, sql_type, update_key", datas)
    def test_bpm(self, req_fix, db_fix, module, api, title, level, req, url, mime, case, expect, sql, sql_type, update_key):
        """测试bpm系统"""
        allure.dynamic.feature(module)
        allure.dynamic.story(api)
        allure.dynamic.title(title)
        allure.dynamic.severity(level)
        # 执行sql语句
        if sql_type == "delete":
            db_fix.delete(sql['delete'])

        elif sql_type == "select":
            select_result = db_fix.select(sql['select'])
            if isinstance(case, str):
                case = case.replace(update_key, select_result)
            else:
                case[update_key] = select_result

        elif sql_type == "select|delete" or sql_type == "delete|select":
            db_fix.delete(sql['delete'])
            select_result = db_fix.select(sql['select'])
            if isinstance(case, str):
                case = case.replace(update_key, select_result)
            else:
                case[update_key] = select_result

        # 发送请求
        res = req_fix.request(req_method=req, req_url=url, req_mime=mime, case_data=case)
        try:
            for key in expect:
                assert expect[key] == res.json().get(key)
        except AssertionError:
            raise AssertionError(f"""
            断言失败, 
            请求方法为:{req},
            请求的url为:{url}, 
            请求的媒体类型为:{mime}, 
            用例数据为:{case}, 
            sql语句为:{sql}, 
            sql语句类型为:{sql_type},
            更新的key为:{update_key}
            期望数据为:{expect}
            服务器返回的数据为:{res.text}
            """)

conftest

import pytest

from APIAutoTest_v3_1.common.db import DB
from APIAutoTest_v3_1.request_method.request_method import RequestMethod


@pytest.fixture(scope="session")
def req_fix():
    """创建RequestMethod类对象"""
    req = RequestMethod()
    yield req
    req.bpm_session.close()


@pytest.fixture(scope="session")
def db_fix():
    """创建DB类对象"""
    db = DB()
    yield db
    db.close()

这样设计就能够让用户根据自己的方式调用这些数据和请求方式来进行测试

此作者没有提供个人介绍。
最后更新于 2025-08-15