标签:index doc bulk ElasticSearch key 使用 path type
插入数据
示例
#!/usr/bin/python
# -*- coding:utf-8 -*-
import os
import re
import json
import time
import elasticsearch
from elasticsearch.helpers import bulk
from multiprocessing import Pool
def add_one(file_path, doc_type, index):
"""准备插入一条"""
print doc_type, index
es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])
with open(file_path, "r") as f:
for line in f:
try:
line = re.sub("\n", "", line)
dict_obj = json.loads(line)
es_client.index(index=index, doc_type=doc_type, body=dict_obj)
except Exception as e:
print "出错了, 错误信息: {}".format(e)
def add_bulk(doc_type, file_path, bulk_num, index):
""""""
es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])
action_list = []
global param
with open(file_path, "r") as f:
for line in f:
# 去除每一行数据中的"\n"字符, 也可以替换为"\\n"
line = line.replace("\n", "")
dict_obj = json.loads(line)
data = {}
# 第一层
for key_1, value_1 in dict_obj.iteritems():
data[key_1] = {}
if isinstance(value_1, dict):
# 第二层
for key_2, value_2 in value_1.iteritems():
data[key_1][key_2] = {}
if isinstance(value_2, dict):
# 第三层
for key_3, value_3 in value_2.iteritems():
data[key_1][key_2][key_3] = {}
if isinstance(value_3, dict):
# 第四层
for key_4, value_4 in value_3.iteritems():
if isinstance(value_4, dict):
value_4 = json.dumps(value_4)
data[key_1][key_2][key_3][key_4] = value_4
else:
data[key_1][key_3] = value_3
else:
data[key_1][key_2] = value_2
else:
data[key_1] = value_1
# 根据bulk_num的值发送一个批量插入请求
action_list.append({
"_op_type": "index",
"_index": index,
"_type": doc_type,
"_source": data,
})
if len(action_list) >= bulk_num:
try:
print "Start Bulk {}...".format(doc_type)
success, failed = bulk(es_client, action_list, index=index, raise_on_error=True, request_timeout=300)
print "End Bulk {}...".format(doc_type)
except Exception as e:
print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])
finally:
del action_list[0:len(action_list)]
# 如果不是bulk_num的等值, 那么就判断列表是否为空, 再次发送一次请求
if len(action_list) > 0:
try:
print "Start Bulk {}...".format(doc_type)
success, failed = bulk(es_client, action_list, index=index, raise_on_error=True, request_timeout=300)
print "End Bulk {}...".format(doc_type)
except Exception as e:
print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])
finally:
del action_list[0:len(action_list)]
def mulit_process(path, index, bulk_num, data):
""""""
# 多进程执行
pool = Pool(10)
results = []
for i in data:
doc_type = i["doc_type"]
file_path = i["file_path"]
result = pool.apply_async(add_bulk, args=(doc_type, file_path, bulk_num, index))
results.append(result)
pool.close()
pool.join()
# for result in results:
# print result.get()
def all_info(path):
data = []
for i in os.listdir(path):
file_dict = {}
if i.endswith(".json"):
doc_type = i.split("_")[0]
file_path = path + i
file_dict["doc_type"] = doc_type
file_dict["file_path"] = file_path
data.append(file_dict)
return data
def es_insert(process_func=None):
""""""
# 库
index = "test"
# 文件路径
path="/home/data"
# 批量插入的数量, 如果是json整条数据插入的话, 可能会出现字段过长的问题, 导致插不进去, 适当调整bulk_num的值
bulk_num = 5000
if not path.endswith("/"):
path += "/"
data = all_info(path)
if process_func == "bulk":
# 插入多条, doc_type, file_path, bulk_num, index
add_bulk("80", path + "80_result.json", bulk_num, index)
elif process_func == "one":
# 插入单条file_path, doc_type, index
add_one(path + "80_result.json", "80", index)
else:
# 多进程
mulit_process(path, index, bulk_num, data)
if __name__ == "__main__":
# 计算脚本执行时间
start_time = time.time()
# 插入数据
es_insert()
# 计算脚本执行时间
end_time = time.time()
print end_time - start_time
查询数据
curl -XPOST "0.0.0.0:9200/test/_search?pretty" -d '{"query":{"bool":{"must":[{"query_string":{"default_field":"ip","query":"40.74.78.234"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}}'
删除索引
curl -X DELETE "localhost:9200/index"
index
是索引名称
即库名
删除type
curl -XPOST 'localhost:9200/index/type/_delete_by_query?conflicts=proceed&pretty' -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}}'
index
是索引名称
即库名
, type
是类型
即表
。
标签:index,doc,bulk,ElasticSearch,key,使用,path,type 来源: https://www.cnblogs.com/zzhaolei/p/11067975.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。