当前位置:网站首页>【接口自动化】
【接口自动化】
2022-08-10 05:35:00 【蓝尼亚】
一、 接口 自动化布局

零、数据库
import os
import configparser
import pymysql
# ========Reading db_config.ini setting ========
# base_dir = os.path.dirname(os.path.abspath(__file__))
# print(base_dir)
# base_dir = base_dir.replace("\\", "/")
# file_path = base_dir + "/db_config.ini"
# print(file_path)
# # cf = cparser.ConfigParser()
# cf = configparser.ConfigParser()
# cf.read(file_path)
cf = configparser.ConfigParser()
cf.read('../db_fixture/db_config.ini', encoding='utf-8')
host = cf.get("mysqlconf", "host")
port = cf.get("mysqlconf", "port")
db = cf.get("mysqlconf", "db_name")
user = cf.get("mysqlconf", "user")
password = cf.get("mysqlconf", "password")
# print("host=", host,"port=",port,"db=",db,"user=",user,"passwprd=",password)
# ======== Mysql base operating ========
class DB:
def __init__(self):
try:
# Connect to the database
self.connection = pymysql.Connect(host=str(host),
port=int(port),
user=str(user),
password=str(password),
db=str(db),
charset="utf8mb4")
except pymysql.err.OperationalError as e:
print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
# select count(data) from table
def select_count(self, column_name, table_name, condition):
real_sql = "select {} from {} where {};".format(column_name, table_name, condition)
print(real_sql)
with self.connection.cursor() as cursor:
count = cursor.execute(real_sql)
# print('there has %s rows record' %count)
result = cursor.fetchall()
#select_result = []
for i in result:
print(i)
#m = select_result.append(i)
#print('select_result',m)
cursor.close()
return count
# clear table data
def clear(self, table_name, condition):
# real_sql = "truncate table" + table_name + ";"
# real_sql = "delete from " + table_name + " where "+condition+";"
real_sql = "delete from {} where {};".format(table_name, condition)
print(real_sql)
with self.connection.cursor() as cursor:
cursor.execute(real_sql)
print("The data --- %s--- has been deleted!" %condition)
self.connection.commit()
# insert sql statement
def insert(self, table_name, key, value):
# for key in table_data:
# table_data[key] = " '" + str(table_data[key]) + "'"
# key = ','.join(table_data.keys())
# value = ','.join(table_data.values())
# real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)
# print(real_sql)
# with self.connection.cursor() as cursor:
# cursor.execute(real_sql)
# for key, value in table_data.items():
# real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)
# print(real_sql)
# with self.connection.cursor() as cursor:
# cursor.execute(real_sql)
real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)
with self.connection.cursor() as cursor:
cursor.execute(real_sql)
self.connection.commit()
# close database
def close(self):
self.connection.close()
# init data
def init_data(self, table_name, key, value):
# for table, data in datas.items():
# #self.clear(table)
# for d in data:
# self.insert(table, d)
# self.close()
real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)
print(real_sql)
with self.connection.cursor() as cursor:
cursor.execute(real_sql)
self.connection.commit()
# select data from table
def select_data(self, column_name, table_name, condition):
# real_sql = "select *" + from + table_name + where + condition +";"
# real_sql = "select " + column_name + " from " + table_name + " where " + condition + ";"
real_sql = "select {} from {} where {} ;".format(column_name,table_name,condition)
print(real_sql)
with self.connection.cursor() as cursor:
count = cursor.execute(real_sql)
print('there has %s rows record' % count)
result = cursor.fetchall()
select_data=[]
for i in result:
select_data.append(i)
print(select_data)
cursor.close()
return list(select_data)
if __name__ == '__main__':
holiday_name = '元宵4'
datas = {'start_time': '2018-01-03', 'end_time': '2018-01-03', 'holiday_name': holiday_name,
'holiday_type': '4', 'main_holiday_id': '175'}
table_name = 'tt_ts_forecast_holiday'
key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id'
value = "'4','2018-01-03','2018-01-03','元宵4','175'"
DB().init_data(table_name, key, value) # 重置查询数据
一、增加
import os,sys
import unittest
from common import method
from config import config
from db_fixture import mysql_db
#登录主页
login_url = config.login_url
login_data = config.login_data
login = method.login(login_url, login_data=login_data)
class AddHolidaies (unittest.TestCase):
"""添加主假日"""
def setUp(self):
self.base_url = config.t3_url
def tearDown(self):
print(self.result)
def test_addMainHolidaies(self):
jsondata = method.fetch_jsondata_from_file('../file/t3')
# 1.一般情况
holiday_name = '元宵4'
jsondata['holidayName'] = holiday_name
t3_column_name = '*'
t3_data_base = 'tt_ts_forecast_holiday'
t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition) # 数据库去重
self.result = method.postdata(self.base_url, jsondata) # 发送请求
self.assertEqual(0, self.result['code'])
self.assertEqual(1, self.result['data'])
self.assertEqual('SUCCESS', self.result['message'])
mysql_db.DB().clear(t3_data_base, t3_condition) # 清除刚添加的数据
# 2.日期大于20天
new_req = method.modify_value(jsondata, startTime='2018-04-01', endTime='2018-04-22')
self.result = method.postdata(self.base_url, new_req)
self.assertEqual(2001103, self.result['code'])
self.assertEqual(0, self.result['data'])
message = 'The number of days between the start and end dates cannot be greater than 20'
self.assertEqual(message, self.result['message'])
if __name__ == "__main__":
# test_data.init_data() # 初始化接口测试数据
unittest.main() # 运行测试集二、查
import unittest
from config import config
from db_fixture import mysql_db
from common import method
# 登录
login_url = config.login_url
login_data = config.login_data
login = method.login(login_url, login_data=login_data)
class ListHolidaies(unittest.TestCase):
def setUp(self):
self.t4_url = config.t4_url
def tearDown(self):
print(self.result)
def test_listHolidaies(self):
holiday_name = '元宵4'
t3_column_name = '*'
t3_data_base = 'tt_ts_forecast_holiday'
t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition) # 去重
key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'
value = "'4','2018-01-03','2018-01-03','元宵4','175', '2019-03-29 22:49:11', '2019-03-29 22:49:11', '0'"
mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据
self.result = method.postdata(self.t4_url, method.fetch_jsondata_from_file('../file/t4'))
print(self.result)
# 提取查询结果 ,对比返回结果
self.assertEqual(0, self.result['code'])
self.assertEqual('SUCCESS', self.result['message'])
self.assertIn('元宵4', str(self.result['data']))
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition)
if __name__ == '__main__':
unittest.main()
3.删除
import os,sys
import unittest
from db_fixture import mysql_db
from config import config
from common import method
import copy
parentpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, parentpath)
# 登录主页
login_url = config.login_url
login_data = config.login_data
login = method.login(login_url, login_data=login_data)
class deleteHoliday_test(unittest.TestCase):
def setUp(self):
self.base_url = config.t5_url
def tearDown(self):
print(self.result)
def test_deleteHoliday(self):
"""去重"""
holiday_name = '元宵4'
t3_column_name = '*'
t3_data_base = 'tt_ts_forecast_holiday'
t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)
main_holiday_id = '175'
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition) # 去重
"""重置查询数据"""
key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'
value = "'4','2018-01-03','2018-01-03','元宵4',{}, '2019-03-29 22:49:11', " \
"'2019-03-29 22:49:11', '0'".format(main_holiday_id)
mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据
"""设置测试数据"""
id_condition = 'holiday_name = "元宵4" AND is_delete = 0 order by id DESC'
id_select = mysql_db.DB().select_data('id', t3_data_base, id_condition)
id_select = id_select[0][0]
json_data = method.fetch_jsondata_from_file('../file/t5')
json_data["id"] = int(id_select)
json_data["mainHolidayId"] = int(main_holiday_id)
self.result = method.postdata(config.t5_url, json_data)
self.assertEqual(0, self.result['code'])
self.assertEqual(1, self.result['data'])
self.assertEqual('SUCCESS', self.result['message'])
if __name__ == "__main__":
# test_data.init_data() # 初始化接口测试数据
unittest.main() # 运行测试集
4.改
import unittest
from config import config
from common import method
from db_fixture import mysql_db
import os
import sys
parentpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, parentpath)
# 登录主页
login_url = config.login_url
login_data = config.login_data
login = method.login(login_url, login_data=login_data)
class updateHoliday_test(unittest.TestCase):
def SetUp(self):
self.t6_url = config.t6_url
def TearDown(self):
print(self.result)
def test_updateHoliday(self):
"""去重"""
holiday_name = '元宵4'
t3_column_name = '*'
t3_data_base = 'tt_ts_forecast_holiday'
t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)
main_holiday_id = '175'
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition) # 去重
"""重置数据"""
key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'
value = "'4','2018-01-03','2018-01-03','元宵4',{}, '2019-03-29 22:49:11', " \
"'2019-03-29 22:49:11', '0'".format(main_holiday_id)
mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据
select_id_data = mysql_db.DB().select_data('id', t3_data_base, t3_condition) # 获取id
"""设置请求数据"""
json_data = method.fetch_jsondata_from_file('../file/t6')
select_id = select_id_data[0][0]
json_data['id'] = int(select_id)
self.result = method.postdata(self.t6_url, json_data)
coloum = 'start_time'
condition = 'id = {}'.format(select_id)
select_data = mysql_db.DB().select_data(coloum, t3_data_base, condition)
print(select_data)
self.assertEqual(self.result['code'], 0)
self.assertEqual(self.result['data'], 1)
self.assertEqual(self.result['message'], 'SUCCESS')
if __name__ == '__main__':
unittest.main()
边栏推荐
- A timeout error is reported when connecting to Nacos
- 栈和队列
- Deep learning TensorFlow entry environment configuration
- A little knowledge point every day
- pytorch-07.处理多维特征的输入
- 【图像识别】训练一个最最简单的AI使其识别Vtuber
- (Flutter报错)Cannot run with sound null safety, because the following dependencies
- Flutter的生命周期
- pytorch-11. Convolutional Neural Network (Advanced)
- 细说MySql索引原理
猜你喜欢
随机推荐
51单片机BH1750智能补光灯台灯光强光照恒流源LED控制系统
LeetCode refers to the offer 21. Adjust the order of the array so that the odd numbers are in front of the even numbers (simple)
win12 modify dns script
Notes for RNN
探索性数据分析EDA
STM32单片机OLED经典2048游戏单片机小游戏
pytorch-06.逻辑斯蒂回归
Consensus calculation and incentive mechanism
【简易笔记】PyTorch官方教程简易笔记 EP3
自定义View的流程总结学习
Flutter Package 插件开发
Common class BigDecimal
LeetCode 1351.统计有序矩阵中的负数(简单)
Explain the principle of MySql index in detail
A little knowledge point every day
pytorch-05. Implementing linear regression with pytorch
LeetCode 2011.执行操作后的变量值(简单)
STM32单片机手机APP蓝牙高亮RGB彩灯控制板任意颜色亮度调光
LeetCode 1894. Find the student number that needs to be supplemented with chalk
【简易笔记】PyTorch官方教程简易笔记 EP2








