如何导入数十亿DNS数据到Elasticsearch中
2022-8-8 09:21:53 Author: mp.weixin.qq.com(查看原文) 阅读量:2 收藏

大家好,我是BaCde。转眼间距离上次更新已经是一年前了。计划了一些输出的内容因为各种原因一直没能完成。会尽量抽出时间来写写最近积累的一些内容,计划后续会逐步跟大家分享。

进入今天的主题。写这一部分内容主要还是有朋友问起如何导入的问题,所以今天就来说说数据导入部分。

针对数十亿数据的批量写入,可以从三个方面去优化,分为是硬件、Elasticsearch 优化、脚本优化。具体需要根据具体实际情况来权衡。

硬件部分,其实也不需要花费很大,主要考虑的是磁盘写入性能。硬件就是在硬盘上下功夫,有条件的可以使用 SSD 做 Raid,我比较偏好 Raid10,但是相对比较费钱。如果不考虑组Raid,那么单块的 SSD 硬盘,选择支持NVME协议 M.2 接口的 SSD,读写性能已经足够。再差一些的可以选择专业些的STAT接口的SSD,其性能也要比机械硬盘强上几倍。具体选购大家自行去网上查下文章即可。我目前使用的主要有Intel和三星的 SSD。

Elasticsearch 的优化之前内容里有提到过,就是修改一些配置。另外还需要注意的就是版本上的差异,自从 Elasticsearch 7 以后的默认应该都是一个分片和副本,由于涉及数据量很大,分片数量要做调整。

之前的配置就是在写入过程中修改刷新时间和副本数,以提升写入的速度。然后等写入完成,再修改回去即可。

{  "index": {    "refresh_interval": -1,    "number_of_replicas": 0  }}

最后就是导入脚本,其实现在导入数据已经相当的简单,从原来的Bulk到现在的 streaming_bulk 和 parallel_bulk 这种更简单便捷的方式。直接按照官方文档给出的示例就可以快速开发出来。顺便说一下,建议大家不了解 python 迭代器、生成器、yield 等内容的可以去学习一下。详细的内容可以看官方文档。地址:https://elasticsearch-py.readthedocs.io/en/master/helpers.html

最后,上代码:

数据结构内容例子,数据的获取大家自行解决,结构不同根据实际情况做调整即可。

{"timestamp":"1640909596","name":"02k0ue.yumenggou.com","type":"cname","value":"ziyuan.baidu.com."}{"timestamp":"1640909798","name":"02k1.com","type":"mx","value":"0 mail.02k1.com."}{"timestamp":"1640909561","name":"02k1.friendlyvote.com","type":"cname","value":"traff-1.hugedomains.com."}{"timestamp":"1640909566","name":"02k1289.jlszhkjyxgs.com","type":"cname","value":"ziyuan.baidu.com."}{"timestamp":"1640909554","name":"02k16t25.coinstimes.com","type":"cname","value":"traff-1.hugedomains.com."}{"timestamp":"1640909583","name":"02k1799.elegancehouse.com","type":"cname","value":"traff-1.hugedomains.com."}{"timestamp":"1640909540","name":"02k199jx8.xiangyishu.com","type":"cname","value":"overdue.aliyun.com."}{"timestamp":"1640909539","name":"02k1w.txgc010.com","type":"cname","value":"sg01.pylist-blog.com."}{"timestamp":"1640909559","name":"02k2.friendlyvote.com","type":"cname","value":"traff-1.hugedomains.com."}{"timestamp":"1640909548","name":"02k262.huiyanji.com","type":"cname","value":"ziyuan.baidu.com."}

代码示例

try:    import simplejson as jsonexcept ImportError:    import json
from elasticsearch import helpers
es = elasticsearch.Elasticsearch(hosts=[{"host": "127.0.0.1", "port": "9200"}], retry_on_timeout=True,max_retries=10)
def importdns(filename): with open(filename,"r",encoding='utf-8') as fp: for line in fp: if line: temp = json.loads(line) # 中间可以加入自己需要做处理的代码。看各自的需求 temp["_index"] = "rapiddns-" + temp["type"] yield temp
path = "dns_data.json"
for ok, response in helpers.streaming_bulk(client=es,max_retries=5,chunk_size=5000,actions=importdns(path,)): if not ok: print(response)

本文为原创内容,转载请标明出处。

关注我,不迷路。


文章来源: https://mp.weixin.qq.com/s?__biz=Mzg4NDU0ODMxOQ==&mid=2247485732&idx=1&sn=2f463548ba8daa2e590dea411e2bc3bb&chksm=cfb73e94f8c0b7823030120c8c75854533298382eaf988b2cbc24ba442ce27abcd6540f1c494&scene=58&subscene=0#rd
如有侵权请联系:admin#unsafe.sh