当前位置:网站首页>【接口自动化】
【接口自动化】
2022-08-10 05:35:00 【蓝尼亚】
一、 接口 自动化布局

零、数据库
import os
import configparser
import pymysql
# ========Reading db_config.ini setting ========
# base_dir = os.path.dirname(os.path.abspath(__file__))
# print(base_dir)
# base_dir = base_dir.replace("\\", "/")
# file_path = base_dir + "/db_config.ini"
# print(file_path)
# # cf = cparser.ConfigParser()
# cf = configparser.ConfigParser()
# cf.read(file_path)
cf = configparser.ConfigParser()
cf.read('../db_fixture/db_config.ini', encoding='utf-8')
host = cf.get("mysqlconf", "host")
port = cf.get("mysqlconf", "port")
db = cf.get("mysqlconf", "db_name")
user = cf.get("mysqlconf", "user")
password = cf.get("mysqlconf", "password")
# print("host=", host,"port=",port,"db=",db,"user=",user,"passwprd=",password)
# ======== Mysql base operating ========
class DB:
def __init__(self):
try:
# Connect to the database
self.connection = pymysql.Connect(host=str(host),
port=int(port),
user=str(user),
password=str(password),
db=str(db),
charset="utf8mb4")
except pymysql.err.OperationalError as e:
print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
# select count(data) from table
def select_count(self, column_name, table_name, condition):
real_sql = "select {} from {} where {};".format(column_name, table_name, condition)
print(real_sql)
with self.connection.cursor() as cursor:
count = cursor.execute(real_sql)
# print('there has %s rows record' %count)
result = cursor.fetchall()
#select_result = []
for i in result:
print(i)
#m = select_result.append(i)
#print('select_result',m)
cursor.close()
return count
# clear table data
def clear(self, table_name, condition):
# real_sql = "truncate table" + table_name + ";"
# real_sql = "delete from " + table_name + " where "+condition+";"
real_sql = "delete from {} where {};".format(table_name, condition)
print(real_sql)
with self.connection.cursor() as cursor:
cursor.execute(real_sql)
print("The data --- %s--- has been deleted!" %condition)
self.connection.commit()
# insert sql statement
def insert(self, table_name, key, value):
# for key in table_data:
# table_data[key] = " '" + str(table_data[key]) + "'"
# key = ','.join(table_data.keys())
# value = ','.join(table_data.values())
# real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)
# print(real_sql)
# with self.connection.cursor() as cursor:
# cursor.execute(real_sql)
# for key, value in table_data.items():
# real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)
# print(real_sql)
# with self.connection.cursor() as cursor:
# cursor.execute(real_sql)
real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)
with self.connection.cursor() as cursor:
cursor.execute(real_sql)
self.connection.commit()
# close database
def close(self):
self.connection.close()
# init data
def init_data(self, table_name, key, value):
# for table, data in datas.items():
# #self.clear(table)
# for d in data:
# self.insert(table, d)
# self.close()
real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)
print(real_sql)
with self.connection.cursor() as cursor:
cursor.execute(real_sql)
self.connection.commit()
# select data from table
def select_data(self, column_name, table_name, condition):
# real_sql = "select *" + from + table_name + where + condition +";"
# real_sql = "select " + column_name + " from " + table_name + " where " + condition + ";"
real_sql = "select {} from {} where {} ;".format(column_name,table_name,condition)
print(real_sql)
with self.connection.cursor() as cursor:
count = cursor.execute(real_sql)
print('there has %s rows record' % count)
result = cursor.fetchall()
select_data=[]
for i in result:
select_data.append(i)
print(select_data)
cursor.close()
return list(select_data)
if __name__ == '__main__':
holiday_name = '元宵4'
datas = {'start_time': '2018-01-03', 'end_time': '2018-01-03', 'holiday_name': holiday_name,
'holiday_type': '4', 'main_holiday_id': '175'}
table_name = 'tt_ts_forecast_holiday'
key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id'
value = "'4','2018-01-03','2018-01-03','元宵4','175'"
DB().init_data(table_name, key, value) # 重置查询数据
一、增加
import os,sys
import unittest
from common import method
from config import config
from db_fixture import mysql_db
#登录主页
login_url = config.login_url
login_data = config.login_data
login = method.login(login_url, login_data=login_data)
class AddHolidaies (unittest.TestCase):
"""添加主假日"""
def setUp(self):
self.base_url = config.t3_url
def tearDown(self):
print(self.result)
def test_addMainHolidaies(self):
jsondata = method.fetch_jsondata_from_file('../file/t3')
# 1.一般情况
holiday_name = '元宵4'
jsondata['holidayName'] = holiday_name
t3_column_name = '*'
t3_data_base = 'tt_ts_forecast_holiday'
t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition) # 数据库去重
self.result = method.postdata(self.base_url, jsondata) # 发送请求
self.assertEqual(0, self.result['code'])
self.assertEqual(1, self.result['data'])
self.assertEqual('SUCCESS', self.result['message'])
mysql_db.DB().clear(t3_data_base, t3_condition) # 清除刚添加的数据
# 2.日期大于20天
new_req = method.modify_value(jsondata, startTime='2018-04-01', endTime='2018-04-22')
self.result = method.postdata(self.base_url, new_req)
self.assertEqual(2001103, self.result['code'])
self.assertEqual(0, self.result['data'])
message = 'The number of days between the start and end dates cannot be greater than 20'
self.assertEqual(message, self.result['message'])
if __name__ == "__main__":
# test_data.init_data() # 初始化接口测试数据
unittest.main() # 运行测试集二、查
import unittest
from config import config
from db_fixture import mysql_db
from common import method
# 登录
login_url = config.login_url
login_data = config.login_data
login = method.login(login_url, login_data=login_data)
class ListHolidaies(unittest.TestCase):
def setUp(self):
self.t4_url = config.t4_url
def tearDown(self):
print(self.result)
def test_listHolidaies(self):
holiday_name = '元宵4'
t3_column_name = '*'
t3_data_base = 'tt_ts_forecast_holiday'
t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition) # 去重
key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'
value = "'4','2018-01-03','2018-01-03','元宵4','175', '2019-03-29 22:49:11', '2019-03-29 22:49:11', '0'"
mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据
self.result = method.postdata(self.t4_url, method.fetch_jsondata_from_file('../file/t4'))
print(self.result)
# 提取查询结果 ,对比返回结果
self.assertEqual(0, self.result['code'])
self.assertEqual('SUCCESS', self.result['message'])
self.assertIn('元宵4', str(self.result['data']))
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition)
if __name__ == '__main__':
unittest.main()
3.删除
import os,sys
import unittest
from db_fixture import mysql_db
from config import config
from common import method
import copy
parentpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, parentpath)
# 登录主页
login_url = config.login_url
login_data = config.login_data
login = method.login(login_url, login_data=login_data)
class deleteHoliday_test(unittest.TestCase):
def setUp(self):
self.base_url = config.t5_url
def tearDown(self):
print(self.result)
def test_deleteHoliday(self):
"""去重"""
holiday_name = '元宵4'
t3_column_name = '*'
t3_data_base = 'tt_ts_forecast_holiday'
t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)
main_holiday_id = '175'
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition) # 去重
"""重置查询数据"""
key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'
value = "'4','2018-01-03','2018-01-03','元宵4',{}, '2019-03-29 22:49:11', " \
"'2019-03-29 22:49:11', '0'".format(main_holiday_id)
mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据
"""设置测试数据"""
id_condition = 'holiday_name = "元宵4" AND is_delete = 0 order by id DESC'
id_select = mysql_db.DB().select_data('id', t3_data_base, id_condition)
id_select = id_select[0][0]
json_data = method.fetch_jsondata_from_file('../file/t5')
json_data["id"] = int(id_select)
json_data["mainHolidayId"] = int(main_holiday_id)
self.result = method.postdata(config.t5_url, json_data)
self.assertEqual(0, self.result['code'])
self.assertEqual(1, self.result['data'])
self.assertEqual('SUCCESS', self.result['message'])
if __name__ == "__main__":
# test_data.init_data() # 初始化接口测试数据
unittest.main() # 运行测试集
4.改
import unittest
from config import config
from common import method
from db_fixture import mysql_db
import os
import sys
parentpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, parentpath)
# 登录主页
login_url = config.login_url
login_data = config.login_data
login = method.login(login_url, login_data=login_data)
class updateHoliday_test(unittest.TestCase):
def SetUp(self):
self.t6_url = config.t6_url
def TearDown(self):
print(self.result)
def test_updateHoliday(self):
"""去重"""
holiday_name = '元宵4'
t3_column_name = '*'
t3_data_base = 'tt_ts_forecast_holiday'
t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)
main_holiday_id = '175'
method.database_de_duplication(t3_column_name,
t3_data_base,
t3_condition) # 去重
"""重置数据"""
key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'
value = "'4','2018-01-03','2018-01-03','元宵4',{}, '2019-03-29 22:49:11', " \
"'2019-03-29 22:49:11', '0'".format(main_holiday_id)
mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据
select_id_data = mysql_db.DB().select_data('id', t3_data_base, t3_condition) # 获取id
"""设置请求数据"""
json_data = method.fetch_jsondata_from_file('../file/t6')
select_id = select_id_data[0][0]
json_data['id'] = int(select_id)
self.result = method.postdata(self.t6_url, json_data)
coloum = 'start_time'
condition = 'id = {}'.format(select_id)
select_data = mysql_db.DB().select_data(coloum, t3_data_base, condition)
print(select_data)
self.assertEqual(self.result['code'], 0)
self.assertEqual(self.result['data'], 1)
self.assertEqual(self.result['message'], 'SUCCESS')
if __name__ == '__main__':
unittest.main()
边栏推荐
猜你喜欢
随机推荐
Pytorch配置与实战--Tips
Machine Learning - Clustering - Shopping Mall Customer Clustering
【烘焙】肉松蛋糕卷
微信小程序-小程序的宿主环境
tinymce rich text editor
PyTorch之CV
[Difference between el and template]
Exploratory Data Analysis EDA
pytorch-05.用pytorch实现线性回归
LeetCode 1894.找到需要补充粉笔的学生编号
el-dropdown drop-down menu style modification, remove the small triangle
LeetCode 1351. Counting Negative Numbers in Ordered Matrices (Simple)
深度学习阶段性报告(一)
Explain the principle of MySql index in detail
我不喜欢我的代码
Flutter的生命周期
探索性数据分析EDA
详解 Hough 变换(上)基本原理与直线检测
LeetCode 292.Nim 游戏(简单)
开源免费WMS仓库管理系统【推荐】









