接口自动化脚本v2

hkt 发布于 2025-08-09 23 次阅读


在v1版本中还有许多的问题,例如未加入数据库连接,会导致正向用例一直显示重复数据。还有一些需要依赖的数值,例如某某id,需要通过数据库查询并将值作为变量进行关联。

v2版本

v2版本增加mysql数据库的连接和增删改查。做出的改变有:

  1. 在数据配置层config.ini中增加mysql节点的信息
  2. 在数据配置层增加sql_data.json文件,也是根据模块名称、接口名称、sql语句的标题和具体的数据组成
  3. 在数据配置层中excel中增加sql语句的标题和sql语句的类型和查询和需要更新的值(依赖)
  4. 在公共层中增加db.py初始化连接
  5. 在公共层中增加read_excel中获取sql类型、sql数据、需要更新的值
  6. 在公共层中red.ini.py中增加获取数据库连接信息
  7. 在请求层中加入是否执行sql增加、删除、增加和删除的判断,在请求前做出相应的删除和关联相应的依赖
  8. 在请求层中的conftest.py中增加db连接和关闭的装饰器

数据配置层

APIAutoTest.xlsx

整理思路:在APIAutoTest文件中增加sql的语句标题、sql的语句类型、需要更新的变量值

config.ini

整理思路:在配置文件中增加mysql节点的数据库基本信息

sql_data.json

整理思路:增加需要操作的具体sql参数,跟用例数据和期望数据一样用模块名称、接口名称、sql标题来确定一个具体的sql具体

公共层

read.ini.py

整理思路:增加get_mysql_connect_msg方法获取mysql节点下的数据库信息

    def get_mysql_connect_msg(self, key):
        """根据key,获取数据库的链接信息"""
        try:
            return self.conf.get("mysql", key)  # 返回值类型为字符串
        except Exception as e:
            raise e

db.py

整理思路:增加db.py文件,编写单例模式的数据库连接的对象,然后初始化连接对象,创建方法关闭对象。创建delete方法对需要的请求进行删除,创建select方法对需要的请求进行查询,需要创建独立的方法进行关闭。

创建单例模式并初始化对象和关闭对象

import pymysql
from dbutils.pooled_db import PooledDB
from APIAutoTest_v2.common.read_ini import ReadIni

class DB:
    __instance = None
    ini = ReadIni()
    def __new__(cls, *args, **kwargs):
        if cls.__instance is None:
            try:
                # 创建连接池
                cls.pool = PooledDB(
                    creator=pymysql,
                    host=cls.ini.get_mysql_connect_msg("host"),
                    port=int(cls.ini.get_mysql_connect_msg("port")),
                    user=cls.ini.get_mysql_connect_msg("user"),
                    password=cls.ini.get_mysql_connect_msg("password"),
                    database=cls.ini.get_mysql_connect_msg("database"),
                    maxconnections=10,
                    mincached=5,
                    blocking=True,
                    charset="utf8mb4"
                )
            except Exception as e:
                raise e
            else:
                cls.__instance = super().__new__(cls, *args, **kwargs)
        return cls.__instance

    def __init__(self):
        """获取链接对象"""
        self.conn = self.pool.connection()

    def close(self):
        """关闭链接对象"""
        self.conn.close()

创建delete方法

    def delete(self, sql_sentence):
        """执行删除的sql语句"""
        # 判断sql语句是否合法
        if isinstance(sql_sentence, str) and sql_sentence.lower().startswith("delete"):
            # 尝试执行删除的sql语句
            try:
                # 获取游标对象
                with self.conn.cursor() as cursor:
                    # 执行sql语句
                    cursor.execute(sql_sentence)
            except Exception as e:
                raise e
            else:
                # 使用链接对象提交
                self.conn.commit()
        else:
            raise ValueError("删除的sql语句错误")

创建select方法

    def select(self, sql_sentence):
        """执行查询的sql语句"""
        # 判断sql语句是否合法
        if isinstance(sql_sentence, str) and sql_sentence.lower().startswith("select"):
            # 尝试执行sql语句
            try:
                # 获取游标对象
                with self.conn.cursor() as cursor:
                    # 执行sql语句
                    cursor.execute(sql_sentence)
            except Exception as e:
                raise e
            else:
                # 如果没有报错,获取查询的结果
                select_res = cursor.fetchone()
                if select_res:
                    return select_res[0]
        else:
            raise ValueError("查询的sql语句错误")

read_excel.py

整理思路:增加sql语句获取的方法、sql类型获取的方法、需要更新数据获取的方法,最后将这几句追加到二维列表中

创建sql_data方法

    def sql_data(self, row):
        """根据行,获取sql语句"""
        sql_data_key = self.__get_cell_value("k", row)
        module = self.module_name(row)
        api = self.api_name(row)
        if module and api and sql_data_key:
            try:
                return self.sql_data_dict[module][api][sql_data_key]
            except Exception as e:
                raise e

创建sql_type方法

    def sql_type(self, row):
        """根据行,获取sql语句的类型"""
        # 获取sql语句类型的值
        value = self.__get_cell_value("l", row)
        # 判断sql语句类型的值是否不为None,如果不为None,将值转小写再返回,如果为None,返回None
        if value:
            return value.lower()

创建updata_key方法

    def update_key(self, row):
        """根据行,获取更新的key"""
        return self.__get_cell_value("m", row)

追加到get_data方法中

    def get_data(self):
        """将测试使用的数据,存放在一个二维列表中"""
        list_data = []
        for row in range(2, self.ws.max_row + 1):
            req = self.case_req(row) # 请求方法
            mime = self.case_mime(row) # 媒体类型
            url = self.case_url(row)  # url
            case = self.case_data(row) # 用例数据
            expect = self.expect_data(row) # 期望数据
            sql = self.sql_data(row)  # sql语句
            sql_type = self.sql_type(row) # sql语句类型
            update_key = self.update_key(row) # 更新的key
            # 过滤空行
            if req and url and expect:
                list_data.append([req, mime, url, case, expect, sql, sql_type, update_key])
                print([req, mime, url, case, expect, sql, sql_type, update_key])
        else:
            return list_data

请求层

整理思路:在请求层的conftest文件增加db_fix装饰器,将其设置为session等级的作用域,使其能在参数化pytest的方法执行前连接数据库执行后关闭数据库。在test_bpm.py中加入判断请求中是否含有select、delete、select|delete字段,如果有就执行sql语句

conftest.py

@pytest.fixture(scope="session")
def db_fix():
    # 创建DB类对象
    db = DB()
    yield db
    db.close()

tset_bpm.py

import pytest
from APIAutoTest_v2.common.read_excel import ReadExcel

datas = ReadExcel().get_data()

class TestBPM:

    @pytest.mark.parametrize("req, mime, url, case, expect, sql, sql_type, update_key", datas)
    def test_bpm(self, req_fix, db_fix, req, mime, url, case, expect, sql, sql_type, update_key):
        # 判断sql语句的类型是否为delete,如果是,使用DB类对象调用delete方法执行删除的sql语句
        if sql_type == "delete":
            # 使用DB类对象调用delete方法执行删除的sql语句--db_fix就是DB类对象
            db_fix.delete(sql['delete'])

        # 判断sql语句的类型是否为select, 如果是,使用DB类对象调用select方法执行,查询的sql语句
        elif sql_type == "select":
            # 使用DB类对象调用select方法执行,查询的sql语句, 并获取查询的结果
            select_result = db_fix.select(sql["select"])
            # 判断用例数据的类型是否为字符串
            if isinstance(case, str):
                # 用例数据为字符串,字符串被替换之后,会得到一个新的字符串,需要使用原来的变量接收新的用例数据
                case = case.replace(update_key, select_result)
            else:
                # case[update_key] = select_result
                case[update_key] = select_result

        # 判断sql语句的类型是否为select|delete 或者为 delete|select
        elif sql_type == "select|delete" or sql_type == "delete|select":
            # 使用DB类对象调用delete方法执行删除的sql语句--db_fix就是DB类对象
            db_fix.delete(sql['delete'])
            # 使用DB类对象调用select方法执行,查询的sql语句, 并获取查询的结果
            select_result = db_fix.select(sql["select"])
            # 判断用例数据的类型是否为字符串
            if isinstance(case, str):
                # 用例数据为字符串,字符串被替换之后,会得到一个新的字符串,需要使用原来的变量接收新的用例数据
                case = case.replace(update_key, select_result)
            else:
                # case[update_key] = select_result
                case[update_key] = select_result

        # # 使用RequestMethod类对象发生请求,自定义固件req_fix就是RequestMethod类的对象
        res = req_fix.request(req_method=req, req_url=url, req_mime=mime, case_data=case)
        # 断言
        for key in expect.keys():
            assert expect[key] in res.json().get(key)
此作者没有提供个人介绍。
最后更新于 2025-08-12