ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ElasticSearch使用

2019-06-22 10:49:55  阅读:156  来源: 互联网

标签:index doc bulk ElasticSearch key 使用 path type


插入数据

示例

#!/usr/bin/python
# -*- coding:utf-8 -*-

import os
import re
import json
import time
import elasticsearch

from elasticsearch.helpers import bulk
from multiprocessing import Pool


def add_one(file_path, doc_type, index):
    """准备插入一条"""
    print doc_type, index
    es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])
    with open(file_path, "r") as f:
        for line in f:
            try:
                line = re.sub("\n", "", line)
                dict_obj = json.loads(line)
                es_client.index(index=index, doc_type=doc_type, body=dict_obj)
            except Exception as e:
                print "出错了, 错误信息: {}".format(e)

def add_bulk(doc_type, file_path, bulk_num, index):
    """"""
    es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])
    action_list = []
    global param

    with open(file_path, "r") as f:

        for line in f:
            # 去除每一行数据中的"\n"字符, 也可以替换为"\\n"
            line = line.replace("\n", "")
            dict_obj = json.loads(line)

            data = {}
            # 第一层
            for key_1, value_1 in dict_obj.iteritems():
                data[key_1] = {}
                if isinstance(value_1, dict):
                    # 第二层
                    for key_2, value_2 in value_1.iteritems():
                        data[key_1][key_2] = {}
                        if isinstance(value_2, dict):
                            # 第三层
                            for key_3, value_3 in value_2.iteritems():
                                data[key_1][key_2][key_3] = {}
                                if isinstance(value_3, dict):
                                    # 第四层
                                    for key_4, value_4 in value_3.iteritems():
                                        if isinstance(value_4, dict):
                                            value_4 = json.dumps(value_4)
                                        data[key_1][key_2][key_3][key_4] = value_4
                                else:
                                    data[key_1][key_3] = value_3
                        else:
                            data[key_1][key_2] = value_2
                else:
                    data[key_1] = value_1

            # 根据bulk_num的值发送一个批量插入请求
            action_list.append({
                "_op_type": "index",
                "_index": index,
                "_type": doc_type,
                "_source": data,
            })

            if len(action_list) >= bulk_num:
                try:
                    print "Start Bulk {}...".format(doc_type)
                    success, failed = bulk(es_client, action_list, index=index, raise_on_error=True, request_timeout=300)
                    print "End Bulk {}...".format(doc_type)
                except Exception as e:
                    print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])
                finally:
                    del action_list[0:len(action_list)]

        # 如果不是bulk_num的等值, 那么就判断列表是否为空, 再次发送一次请求
        if len(action_list) > 0:
            try:
                print "Start Bulk {}...".format(doc_type)
                success, failed = bulk(es_client, action_list, index=index, raise_on_error=True, request_timeout=300)
                print "End Bulk {}...".format(doc_type)
            except Exception as e:
                print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])
            finally:
                del action_list[0:len(action_list)]

def mulit_process(path, index, bulk_num, data):
    """"""
    # 多进程执行
    pool = Pool(10)

    results = []
    for i in data:
        doc_type = i["doc_type"]
        file_path = i["file_path"]
        result = pool.apply_async(add_bulk, args=(doc_type, file_path, bulk_num, index))
        results.append(result)
    
    pool.close()
    pool.join()

    # for result in results:
    #     print result.get()

def all_info(path):
    data = []
    for i in os.listdir(path):
        file_dict = {}
        if i.endswith(".json"):
            doc_type = i.split("_")[0]
            file_path = path + i
            file_dict["doc_type"] = doc_type
            file_dict["file_path"] = file_path
            data.append(file_dict)

    return data

def es_insert(process_func=None):
    """"""
    # 库
    index = "test"
    # 文件路径
    path="/home/data"
    
    # 批量插入的数量, 如果是json整条数据插入的话, 可能会出现字段过长的问题, 导致插不进去, 适当调整bulk_num的值
    bulk_num = 5000

    if not path.endswith("/"):
        path += "/"

    data = all_info(path)

    if process_func == "bulk":
        # 插入多条, doc_type, file_path, bulk_num, index
        add_bulk("80", path + "80_result.json", bulk_num, index)
    elif process_func == "one":
        # 插入单条file_path, doc_type, index
        add_one(path + "80_result.json", "80", index)
    else:
        # 多进程
        mulit_process(path, index, bulk_num, data)

if __name__ == "__main__":
    # 计算脚本执行时间
    start_time = time.time()

    # 插入数据
    es_insert()

    # 计算脚本执行时间
    end_time = time.time()
    print end_time - start_time

查询数据

curl -XPOST "0.0.0.0:9200/test/_search?pretty" -d '{"query":{"bool":{"must":[{"query_string":{"default_field":"ip","query":"40.74.78.234"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}}'

删除索引

curl -X DELETE "localhost:9200/index"

index索引名称库名

删除type

curl -XPOST 'localhost:9200/index/type/_delete_by_query?conflicts=proceed&pretty' -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}}'

index索引名称库名, type类型

标签:index,doc,bulk,ElasticSearch,key,使用,path,type
来源: https://www.cnblogs.com/zzhaolei/p/11067975.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有