我必须在MongoDB上进行大量的插入和更新.
我正在尝试测试多处理来完成这些任务.为此,我创建了这个简单的代码.我的虚拟数据是:
documents = [{"a number": i} for i in range(1000000)]
没有多处理:
time1s = time.time()
client = MongoClient()
db = client.mydb
col = db.mycol
for doc in documents:
col.insert_one(doc)
time1f = time.time()
print(time1f-time1s)
我有150秒.
通过多处理,我根据需要定义了以下工作函数,并在Pymongo’s FAQs中进行了描述.
def insert_doc(document):
client = MongoClient()
db = client.mydb
col = db.mycol
col.insert_one(document)
但是,当我运行我的代码时:
time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, documents)
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)
我收到一个错误:
pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno
99] Cannot assign requested address
在提出错误之前,共处理了26447份文件.虽然遇到该错误的人没有使用多处理,但是在here中解释了此错误.解决方案是只打开一个MongoClient,但是当我想进行多处理时,这是不可能的.有没有解决方法?谢谢你的帮助.
解决方法:
您的代码为示例中的每个文档创建一个新的MongoClient(就像您链接的问题一样).这要求您为每个新查询打开一个新套接字.这会破坏PyMongo的连接池,除了速度极慢之外,它还意味着你比TCP堆栈更快地打开和关闭套接字:你在TIME_WAIT状态下留下太多套接字,这样你最终会耗尽端口.
如果您为每个客户端插入大量文档,则可以创建更少的客户端,从而打开更少的套接字:
import multiprocessing as mp
import time
from pymongo import MongoClient
documents = [{"a number": i} for i in range(1000000)]
def insert_doc(chunk):
client = MongoClient()
db = client.mydb
col = db.mycol
col.insert_many(chunk)
chunk_size = 10000
def chunks(sequence):
# Chunks of 1000 documents at a time.
for j in range(0, len(sequence), chunk_size):
yield sequence[j:j + chunk_size]
time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, chunks(documents))
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)
标签:python,mongodb,pymongo 来源: https://codeday.me/bug/20190724/1523947.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。