当前位置:网站首页>使用pymongo保存数据到MongoDB的工具类
使用pymongo保存数据到MongoDB的工具类
2022-08-08 15:34:00 【呆萌的代Ma】
pymongo 的增删改查请参考:Python连接MongoDB,使用pymongo进行增删改查
在此基础上,引入多线程(pymongo多线程安全),打包成一个工具类,可以直接把一个dataframe保存/更新/覆盖的工具类:
工具类
from pymongo import MongoClient
import pandas as pd
import numpy as np
class MongoHelper(object):
def __init__(self, database, collection=None, host="127.0.0.1", port=27017):
mongo_conn = MongoClient(host=host, port=port)
self.mongo_db = mongo_conn.get_database(database) # 数据库连接(pymongo自带连接池)
self.coll = None
if collection: # 集合
self.coll = self.mongo_db.get_collection(collection)
def set_collection(self, collection):
"""指定链接的集合"""
self.coll = self.mongo_db.get_collection(collection)
def get_coll(self):
return self.coll
def _save_or_update_mongodb(self, record, dict_value, drop=False):
"""根据检查_id,如果存在就覆盖,如果不存在就新增 :param record: 数据库记录 :param dict_value: 待保存的数据 :param drop: 如果有记录,是否先删除后保存 """
if not record: # 不存在,直接插入
self.coll.insert_one(dict_value)
elif drop is True: # 先删除,然后插入,相当于覆盖原始数据
self.coll.delete_one(record)
self.coll.insert_one(dict_value)
else: # 更新
self.coll.update_one(record, {
"$set": dict_value,
})
def get_all_data(self, record_list=["_id"]):
"""获取全部数据,record_list中为需要的数据列名称"""
for _record in self.coll.find({
}, record_list):
yield _record
def save_new_dataframe_to_mongo(self, dataframe: pd.DataFrame):
"""保存dataframe格式的新数据到MongoDB,会自动分配_id,也可以指定_id"""
coll = self.get_coll()
coll.insert_many(dataframe.to_dict(orient="record"))
def update_dataframe(self, dataframe: pd.DataFrame, record_by: list = ["_id"], drop=False):
"""多线程插入dataframe"""
import threading
coll = self.get_coll()
# 剔除dataframe中不存在的列名
record_by = [_r for _r in record_by if _r in dataframe.columns]
thread_list = []
for df_record in dataframe.to_dict(orient="record"):
find_dict = {
}
for _r_by in record_by:
find_dict[_r_by] = df_record[_r_by]
record = coll.find_one(find_dict, {
})
thread = threading.Thread(target=self._save_or_update_mongodb, args=(record, df_record, drop))
thread.start()
thread_list.append(thread)
for _thr in thread_list: # 等待线程全部执行完毕
_thr.join()
def update_dict(self, new_dict_value: dict, record_by: list = ["_id"], drop=False):
"""更新一个dict的数据"""
coll = self.get_coll()
record_by = [_r for _r in record_by if _r in new_dict_value.keys()]
find_dict = {
}
for _r_by in record_by:
find_dict[_r_by] = new_dict_value[_r_by]
record = coll.find_one(find_dict, {
})
self._save_or_update_mongodb(record, new_dict_value, drop=drop)
示例代码
from pymongo import MongoClient
import pandas as pd
import numpy as np
class MongoHelper(object):
def __init__(self, database, collection=None, host="127.0.0.1", port=27017):
mongo_conn = MongoClient(host=host, port=port)
self.mongo_db = mongo_conn.get_database(database) # 数据库连接(pymongo自带连接池)
self.coll = None
if collection: # 集合
self.coll = self.mongo_db.get_collection(collection)
def set_collection(self, collection):
"""指定链接的集合"""
self.coll = self.mongo_db.get_collection(collection)
def get_coll(self):
return self.coll
def _save_or_update_mongodb(self, record, dict_value, drop=False):
"""根据检查_id,如果存在就覆盖,如果不存在就新增 :param record: 数据库记录 :param dict_value: 待保存的数据 :param drop: 如果有记录,是否先删除后保存 """
if not record: # 不存在,直接插入
self.coll.insert_one(dict_value)
elif drop is True: # 先删除,然后插入,相当于覆盖原始数据
self.coll.delete_one(record)
self.coll.insert_one(dict_value)
else: # 更新
self.coll.update_one(record, {
"$set": dict_value,
})
def get_all_data(self, record_list=["_id"]):
"""获取全部数据,record_list中为需要的数据列名称"""
for _record in self.coll.find({
}, record_list):
yield _record
def save_new_dataframe_to_mongo(self, dataframe: pd.DataFrame):
"""保存dataframe格式的新数据到MongoDB,会自动分配_id,也可以指定_id"""
coll = self.get_coll()
coll.insert_many(dataframe.to_dict(orient="record"))
def update_dataframe(self, dataframe: pd.DataFrame, record_by: list = ["_id"], drop=False):
"""多线程插入dataframe"""
import threading
coll = self.get_coll()
# 剔除dataframe中不存在的列名
record_by = [_r for _r in record_by if _r in dataframe.columns]
thread_list = []
for df_record in dataframe.to_dict(orient="record"):
find_dict = {
}
for _r_by in record_by:
find_dict[_r_by] = df_record[_r_by]
record = coll.find_one(find_dict, {
})
thread = threading.Thread(target=self._save_or_update_mongodb, args=(record, df_record, drop))
thread.start()
thread_list.append(thread)
for _thr in thread_list: # 等待线程全部执行完毕
_thr.join()
def update_dict(self, new_dict_value: dict, record_by: list = ["_id"], drop=False):
"""更新一个dict的数据"""
coll = self.get_coll()
record_by = [_r for _r in record_by if _r in new_dict_value.keys()]
find_dict = {
}
for _r_by in record_by:
find_dict[_r_by] = new_dict_value[_r_by]
record = coll.find_one(find_dict, {
})
self._save_or_update_mongodb(record, new_dict_value, drop=drop)
def main():
mongo_helper = MongoHelper(database='test_db', collection="test_coll")
df1 = pd.DataFrame(np.random.randn(100, 2), columns=['a', 'b'])
df1["_id"] = [str(i) for i in range(df1.shape[0])]
mongo_helper.save_new_dataframe_to_mongo(df1)
# 改变原始数据
df2 = pd.DataFrame(np.random.randn(100, 2), columns=['a', 'c'])
df2["_id"] = [str(i) for i in range(df2.shape[0])]
mongo_helper.update_dataframe(df2)
if __name__ == '__main__':
main()
如果是
边栏推荐
- 文档管理系统:攻克这3个痛点,解决80%企业文档管理难题
- “科林明伦杯”哈尔滨理工大学暑假训练赛 B吃雪糕 (异或思维题)(补题)
- Introduction to Recurrent Neural Network (RNN)
- 带你玩转“超大杯”ECS特性及实验踩坑【华为云至简致远】
- Zhaoqi Technology Innovation and Entrepreneurship Event Event Platform, Investment and Financing Matchmaking, Online Live Roadshow
- [Online interviewer] How to achieve deduplication and idempotency
- JS-BOM-for, if (string to case)
- Smobiler的复杂控件的由来与创造
- Elegantly detect and update web applications in real time
- 企业开发小程序有什么优势?为什么要开发小程序?
猜你喜欢
Streamsets Data Collector 3.12
JS-BOM-factorial calculation
Take you to play with the "Super Cup" ECS features and experiment on the pit [HUAWEI CLOUD is simple and far]
Power BI简介
ThinkPHP3.2链接带中文参数乱码导致分页数据错误
腾讯又一长达 8 年的服务下架。。。
leetcode--541. 反转字符串II
Shell三剑客之sed命令详解
跟我一起来学弹性云服务器ECS【华为云至简致远】
连这几个网站都不知道,怪不得你的消息比别人落后
随机推荐
带你玩转“超大杯”ECS特性及实验踩坑【华为云至简致远】
线程本地存储 ThreadLocal
Ubuntu下使用sudo dpkg --configure -a后数据库出现问题
bzoj3262 陌上花开
web automation headless mode
彻底理解 volatile 关键字及应用场景,面试必问,小白都能看懂!
Mysql数据库入门学习笔记
Thoroughly understand the volatile keyword and application scenarios, and it is a must for interviews, and Xiaobai can understand it!
携手数字创新 共筑国产生态 7月份AntDB与5款产品完成互认证
C. Build Permutation(构造/数论)
瑞吉外卖学习笔记3
JS - BOM - - can be achieved through calculation or default values
CS231n: 6 training neural network (2)
基于Qt设计的课堂考勤系统(采用RDS for MySQL云数据库 )【华为云至简致远】
【有奖征文 第13期】至简致远,“云”响世界,大胆秀出你的华为云技术主张,高额激励等你拿
跟我一起来学弹性云服务器ECS【华为云至简致远】
Is it safe to open an account in China Galaxy Securities?
想去银行测试?那这套题目你必须要会
腾讯超大 Apache Pulsar 集群的客户端性能调优实践
bzoj3693 round table hall theorem + segment tree