当前位置:网站首页>使用pymongo保存数据到MongoDB的工具类
使用pymongo保存数据到MongoDB的工具类
2022-08-08 15:34:00 【呆萌的代Ma】
pymongo 的增删改查请参考:Python连接MongoDB,使用pymongo进行增删改查
在此基础上,引入多线程(pymongo多线程安全),打包成一个工具类,可以直接把一个dataframe保存/更新/覆盖的工具类:
工具类
from pymongo import MongoClient
import pandas as pd
import numpy as np
class MongoHelper(object):
def __init__(self, database, collection=None, host="127.0.0.1", port=27017):
mongo_conn = MongoClient(host=host, port=port)
self.mongo_db = mongo_conn.get_database(database) # 数据库连接(pymongo自带连接池)
self.coll = None
if collection: # 集合
self.coll = self.mongo_db.get_collection(collection)
def set_collection(self, collection):
"""指定链接的集合"""
self.coll = self.mongo_db.get_collection(collection)
def get_coll(self):
return self.coll
def _save_or_update_mongodb(self, record, dict_value, drop=False):
"""根据检查_id,如果存在就覆盖,如果不存在就新增 :param record: 数据库记录 :param dict_value: 待保存的数据 :param drop: 如果有记录,是否先删除后保存 """
if not record: # 不存在,直接插入
self.coll.insert_one(dict_value)
elif drop is True: # 先删除,然后插入,相当于覆盖原始数据
self.coll.delete_one(record)
self.coll.insert_one(dict_value)
else: # 更新
self.coll.update_one(record, {
"$set": dict_value,
})
def get_all_data(self, record_list=["_id"]):
"""获取全部数据,record_list中为需要的数据列名称"""
for _record in self.coll.find({
}, record_list):
yield _record
def save_new_dataframe_to_mongo(self, dataframe: pd.DataFrame):
"""保存dataframe格式的新数据到MongoDB,会自动分配_id,也可以指定_id"""
coll = self.get_coll()
coll.insert_many(dataframe.to_dict(orient="record"))
def update_dataframe(self, dataframe: pd.DataFrame, record_by: list = ["_id"], drop=False):
"""多线程插入dataframe"""
import threading
coll = self.get_coll()
# 剔除dataframe中不存在的列名
record_by = [_r for _r in record_by if _r in dataframe.columns]
thread_list = []
for df_record in dataframe.to_dict(orient="record"):
find_dict = {
}
for _r_by in record_by:
find_dict[_r_by] = df_record[_r_by]
record = coll.find_one(find_dict, {
})
thread = threading.Thread(target=self._save_or_update_mongodb, args=(record, df_record, drop))
thread.start()
thread_list.append(thread)
for _thr in thread_list: # 等待线程全部执行完毕
_thr.join()
def update_dict(self, new_dict_value: dict, record_by: list = ["_id"], drop=False):
"""更新一个dict的数据"""
coll = self.get_coll()
record_by = [_r for _r in record_by if _r in new_dict_value.keys()]
find_dict = {
}
for _r_by in record_by:
find_dict[_r_by] = new_dict_value[_r_by]
record = coll.find_one(find_dict, {
})
self._save_or_update_mongodb(record, new_dict_value, drop=drop)
示例代码
from pymongo import MongoClient
import pandas as pd
import numpy as np
class MongoHelper(object):
def __init__(self, database, collection=None, host="127.0.0.1", port=27017):
mongo_conn = MongoClient(host=host, port=port)
self.mongo_db = mongo_conn.get_database(database) # 数据库连接(pymongo自带连接池)
self.coll = None
if collection: # 集合
self.coll = self.mongo_db.get_collection(collection)
def set_collection(self, collection):
"""指定链接的集合"""
self.coll = self.mongo_db.get_collection(collection)
def get_coll(self):
return self.coll
def _save_or_update_mongodb(self, record, dict_value, drop=False):
"""根据检查_id,如果存在就覆盖,如果不存在就新增 :param record: 数据库记录 :param dict_value: 待保存的数据 :param drop: 如果有记录,是否先删除后保存 """
if not record: # 不存在,直接插入
self.coll.insert_one(dict_value)
elif drop is True: # 先删除,然后插入,相当于覆盖原始数据
self.coll.delete_one(record)
self.coll.insert_one(dict_value)
else: # 更新
self.coll.update_one(record, {
"$set": dict_value,
})
def get_all_data(self, record_list=["_id"]):
"""获取全部数据,record_list中为需要的数据列名称"""
for _record in self.coll.find({
}, record_list):
yield _record
def save_new_dataframe_to_mongo(self, dataframe: pd.DataFrame):
"""保存dataframe格式的新数据到MongoDB,会自动分配_id,也可以指定_id"""
coll = self.get_coll()
coll.insert_many(dataframe.to_dict(orient="record"))
def update_dataframe(self, dataframe: pd.DataFrame, record_by: list = ["_id"], drop=False):
"""多线程插入dataframe"""
import threading
coll = self.get_coll()
# 剔除dataframe中不存在的列名
record_by = [_r for _r in record_by if _r in dataframe.columns]
thread_list = []
for df_record in dataframe.to_dict(orient="record"):
find_dict = {
}
for _r_by in record_by:
find_dict[_r_by] = df_record[_r_by]
record = coll.find_one(find_dict, {
})
thread = threading.Thread(target=self._save_or_update_mongodb, args=(record, df_record, drop))
thread.start()
thread_list.append(thread)
for _thr in thread_list: # 等待线程全部执行完毕
_thr.join()
def update_dict(self, new_dict_value: dict, record_by: list = ["_id"], drop=False):
"""更新一个dict的数据"""
coll = self.get_coll()
record_by = [_r for _r in record_by if _r in new_dict_value.keys()]
find_dict = {
}
for _r_by in record_by:
find_dict[_r_by] = new_dict_value[_r_by]
record = coll.find_one(find_dict, {
})
self._save_or_update_mongodb(record, new_dict_value, drop=drop)
def main():
mongo_helper = MongoHelper(database='test_db', collection="test_coll")
df1 = pd.DataFrame(np.random.randn(100, 2), columns=['a', 'b'])
df1["_id"] = [str(i) for i in range(df1.shape[0])]
mongo_helper.save_new_dataframe_to_mongo(df1)
# 改变原始数据
df2 = pd.DataFrame(np.random.randn(100, 2), columns=['a', 'c'])
df2["_id"] = [str(i) for i in range(df2.shape[0])]
mongo_helper.update_dataframe(df2)
if __name__ == '__main__':
main()
如果是
边栏推荐
- 本机Redis Desktop Manager连不上vmware的redis
- 【愚公系列】华为云云数据库MySQL的体验流程|【华为云至简致远】
- 连这几个网站都不知道,怪不得你的消息比别人落后
- sqllabs 1~6通关详解
- leetcode/delete the nth node from the bottom of the linked list
- leetcode/回文子字符串的个数
- Synergistic authors open source throttling, 2022 trend of technology foresight (asynchronous programming/container technology)
- web-sql注入
- Streamsets Data Collector 3.12
- 保险,一生必备
猜你喜欢
【愚公系列】华为云云数据库MySQL的体验流程|【华为云至简致远】
Notes on the development of kindergarten enrollment registration system based on WeChat applet
sqoop连接MySQL跟本机不一致是为什么
从洞察到决策,一文解读标签画像体系建设方法论丨DTVision分析洞察篇
领域驱动设计系列贫血模型和充血模型
干货:从零设计高并发架构
[Unity entry plan] Unity instance - how to protect data members through encapsulation in C#
A16z:为什么 NFT 创作者要选择 cc0?
分享这些2022设计师们决不能错过的Blender新插件
你真的会软件测试bug分析定位嘛
随机推荐
我分析30w条数据后发现,西安新房公摊最低的竟是这里?
First online!Messaging middleware fairy notes, covering the essence of Alibaba's ten years of technology
bzoj3693 圆桌会议 hall定理+线段树
C. Build Permutation(构造/数论)
面试题 17.05. 字母与数字
从洞察到决策,一文解读标签画像体系建设方法论丨DTVision分析洞察篇
全志V853芯片Tina下RTSP环境搭建方法
分布式服务治理
【愚公系列】华为云云数据库MySQL的体验流程|【华为云至简致远】
想去银行测试?那这套题目你必须要会
分布式架构服务调用
leetcode/number of palindromic substrings
一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持
你真的会软件测试bug分析定位嘛
Take you to play with the "Super Cup" ECS features and experiment on the pit [HUAWEI CLOUD is simple and far]
[Online interviewer] How to achieve deduplication and idempotency
Ubuntu下使用sudo dpkg --configure -a后数据库出现问题
[Unity entry plan] Use the double blood bar method to control the blood loss speed of the damage area
Guanghong Technology: The company provides manufacturing services for Xiaomi, Samsung, OPPO, Nokia and other products in India
HMS Core Analysis Service Intelligent Operation Version 6.5.1 Launched