在v1版本中还有许多的问题,例如未加入数据库连接,会导致正向用例一直显示重复数据。还有一些需要依赖的数值,例如某某id,需要通过数据库查询并将值作为变量进行关联。
v2版本
v2版本增加mysql数据库的连接和增删改查。做出的改变有:
- 在数据配置层config.ini中增加mysql节点的信息
- 在数据配置层增加sql_data.json文件,也是根据模块名称、接口名称、sql语句的标题和具体的数据组成
- 在数据配置层中excel中增加sql语句的标题和sql语句的类型和查询和需要更新的值(依赖)
- 在公共层中增加db.py初始化连接
- 在公共层中增加read_excel中获取sql类型、sql数据、需要更新的值
- 在公共层中red.ini.py中增加获取数据库连接信息
- 在请求层中加入是否执行sql增加、删除、增加和删除的判断,在请求前做出相应的删除和关联相应的依赖
- 在请求层中的conftest.py中增加db连接和关闭的装饰器
数据配置层
APIAutoTest.xlsx
整理思路:在APIAutoTest文件中增加sql的语句标题、sql的语句类型、需要更新的变量值

config.ini
整理思路:在配置文件中增加mysql节点的数据库基本信息

sql_data.json
整理思路:增加需要操作的具体sql参数,跟用例数据和期望数据一样用模块名称、接口名称、sql标题来确定一个具体的sql具体

公共层
read.ini.py
整理思路:增加get_mysql_connect_msg方法获取mysql节点下的数据库信息
def get_mysql_connect_msg(self, key):
"""根据key,获取数据库的链接信息"""
try:
return self.conf.get("mysql", key) # 返回值类型为字符串
except Exception as e:
raise e
db.py
整理思路:增加db.py文件,编写单例模式的数据库连接的对象,然后初始化连接对象,创建方法关闭对象。创建delete方法对需要的请求进行删除,创建select方法对需要的请求进行查询,需要创建独立的方法进行关闭。
创建单例模式并初始化对象和关闭对象
import pymysql
from dbutils.pooled_db import PooledDB
from APIAutoTest_v2.common.read_ini import ReadIni
class DB:
__instance = None
ini = ReadIni()
def __new__(cls, *args, **kwargs):
if cls.__instance is None:
try:
# 创建连接池
cls.pool = PooledDB(
creator=pymysql,
host=cls.ini.get_mysql_connect_msg("host"),
port=int(cls.ini.get_mysql_connect_msg("port")),
user=cls.ini.get_mysql_connect_msg("user"),
password=cls.ini.get_mysql_connect_msg("password"),
database=cls.ini.get_mysql_connect_msg("database"),
maxconnections=10,
mincached=5,
blocking=True,
charset="utf8mb4"
)
except Exception as e:
raise e
else:
cls.__instance = super().__new__(cls, *args, **kwargs)
return cls.__instance
def __init__(self):
"""获取链接对象"""
self.conn = self.pool.connection()
def close(self):
"""关闭链接对象"""
self.conn.close()
创建delete方法
def delete(self, sql_sentence):
"""执行删除的sql语句"""
# 判断sql语句是否合法
if isinstance(sql_sentence, str) and sql_sentence.lower().startswith("delete"):
# 尝试执行删除的sql语句
try:
# 获取游标对象
with self.conn.cursor() as cursor:
# 执行sql语句
cursor.execute(sql_sentence)
except Exception as e:
raise e
else:
# 使用链接对象提交
self.conn.commit()
else:
raise ValueError("删除的sql语句错误")
创建select方法
def select(self, sql_sentence):
"""执行查询的sql语句"""
# 判断sql语句是否合法
if isinstance(sql_sentence, str) and sql_sentence.lower().startswith("select"):
# 尝试执行sql语句
try:
# 获取游标对象
with self.conn.cursor() as cursor:
# 执行sql语句
cursor.execute(sql_sentence)
except Exception as e:
raise e
else:
# 如果没有报错,获取查询的结果
select_res = cursor.fetchone()
if select_res:
return select_res[0]
else:
raise ValueError("查询的sql语句错误")
read_excel.py
整理思路:增加sql语句获取的方法、sql类型获取的方法、需要更新数据获取的方法,最后将这几句追加到二维列表中
创建sql_data方法
def sql_data(self, row):
"""根据行,获取sql语句"""
sql_data_key = self.__get_cell_value("k", row)
module = self.module_name(row)
api = self.api_name(row)
if module and api and sql_data_key:
try:
return self.sql_data_dict[module][api][sql_data_key]
except Exception as e:
raise e
创建sql_type方法
def sql_type(self, row):
"""根据行,获取sql语句的类型"""
# 获取sql语句类型的值
value = self.__get_cell_value("l", row)
# 判断sql语句类型的值是否不为None,如果不为None,将值转小写再返回,如果为None,返回None
if value:
return value.lower()
创建updata_key方法
def update_key(self, row):
"""根据行,获取更新的key"""
return self.__get_cell_value("m", row)
追加到get_data方法中
def get_data(self):
"""将测试使用的数据,存放在一个二维列表中"""
list_data = []
for row in range(2, self.ws.max_row + 1):
req = self.case_req(row) # 请求方法
mime = self.case_mime(row) # 媒体类型
url = self.case_url(row) # url
case = self.case_data(row) # 用例数据
expect = self.expect_data(row) # 期望数据
sql = self.sql_data(row) # sql语句
sql_type = self.sql_type(row) # sql语句类型
update_key = self.update_key(row) # 更新的key
# 过滤空行
if req and url and expect:
list_data.append([req, mime, url, case, expect, sql, sql_type, update_key])
print([req, mime, url, case, expect, sql, sql_type, update_key])
else:
return list_data
请求层
整理思路:在请求层的conftest文件增加db_fix装饰器,将其设置为session等级的作用域,使其能在参数化pytest的方法执行前连接数据库执行后关闭数据库。在test_bpm.py中加入判断请求中是否含有select、delete、select|delete字段,如果有就执行sql语句
conftest.py
@pytest.fixture(scope="session")
def db_fix():
# 创建DB类对象
db = DB()
yield db
db.close()
tset_bpm.py
import pytest
from APIAutoTest_v2.common.read_excel import ReadExcel
datas = ReadExcel().get_data()
class TestBPM:
@pytest.mark.parametrize("req, mime, url, case, expect, sql, sql_type, update_key", datas)
def test_bpm(self, req_fix, db_fix, req, mime, url, case, expect, sql, sql_type, update_key):
# 判断sql语句的类型是否为delete,如果是,使用DB类对象调用delete方法执行删除的sql语句
if sql_type == "delete":
# 使用DB类对象调用delete方法执行删除的sql语句--db_fix就是DB类对象
db_fix.delete(sql['delete'])
# 判断sql语句的类型是否为select, 如果是,使用DB类对象调用select方法执行,查询的sql语句
elif sql_type == "select":
# 使用DB类对象调用select方法执行,查询的sql语句, 并获取查询的结果
select_result = db_fix.select(sql["select"])
# 判断用例数据的类型是否为字符串
if isinstance(case, str):
# 用例数据为字符串,字符串被替换之后,会得到一个新的字符串,需要使用原来的变量接收新的用例数据
case = case.replace(update_key, select_result)
else:
# case[update_key] = select_result
case[update_key] = select_result
# 判断sql语句的类型是否为select|delete 或者为 delete|select
elif sql_type == "select|delete" or sql_type == "delete|select":
# 使用DB类对象调用delete方法执行删除的sql语句--db_fix就是DB类对象
db_fix.delete(sql['delete'])
# 使用DB类对象调用select方法执行,查询的sql语句, 并获取查询的结果
select_result = db_fix.select(sql["select"])
# 判断用例数据的类型是否为字符串
if isinstance(case, str):
# 用例数据为字符串,字符串被替换之后,会得到一个新的字符串,需要使用原来的变量接收新的用例数据
case = case.replace(update_key, select_result)
else:
# case[update_key] = select_result
case[update_key] = select_result
# # 使用RequestMethod类对象发生请求,自定义固件req_fix就是RequestMethod类的对象
res = req_fix.request(req_method=req, req_url=url, req_mime=mime, case_data=case)
# 断言
for key in expect.keys():
assert expect[key] in res.json().get(key)

Comments NOTHING