0%

使用 Jupyter Notebook 操作 ElasticSearch 数据库

使用 Notebook 按条件查询 ES 数据,并写入 CSV 文件

1.导入依赖包

1
2
3
4
5
6
7
8
9
10
11
12
13
# 导入ES客户端
from elasticsearch import Elasticsearch

#导入格式化打印工具
import pprint
pr = pprint.pprint

#导入CSV处理工具
import pandas as pd

#屏蔽一些不重要的警告
import warnings
warnings.filterwarnings("ignore")

2.定义连接对象

1
2
3
4
5
6
7
8
def connect_elasticsearch():
es = None
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
if es.ping():
print('Connected ')
else:
print('it could not connect!')
return es

3.实例化并验证

1
2
3
4
5
es = connect_elasticsearch()

es.ping()

pr(es.info())
Connected 
{'cluster_name': 'elasticsearch_zhangminglei',
 'cluster_uuid': 'nKMmhRTTS7qdHBNmOwrgvQ',
 'name': 'zhangmingleideMacBook-Pro.local',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2022-05-18T18:04:20.964345128Z',
             'build_flavor': 'default',
             'build_hash': '79878662c54c886ae89206c685d9f1051a9d6411',
             'build_snapshot': False,
             'build_type': 'tar',
             'lucene_version': '8.11.1',
             'minimum_index_compatibility_version': '6.0.0-beta1',
             'minimum_wire_compatibility_version': '6.8.0',
             'number': '7.17.4'}}

4.按条件查询打印

1
2
3
4
5
6
7
8
9
# res = es.search(index = 'kibana_sample_data_logs', body = {'query': {"match_all": {}} ,"size": 100} )

# res = es.search(index = 'kibana_sample_data_logs', body = {"query":{"bool":{"must":[{"prefix":{"host":"www"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}} )


# res = es.search(index = 'kibana_sample_data_logs', body ={'query':{"bool":{"must":[{"query_string":{"default_field":"host","query":"elastic-elastic-elastic.org"}}]}},"size":100} )

res = es.search(index = 'kibana_sample_data_logs', body = {"query":{"bool":{"must":[{"term":{"referer":"http://twitter.com/success/wendy-lawrence"}},{"range":{"bytes":{"gt":"6000"}}},{"match":{"extension":"deb"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}} )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
aim = res["hits"]['hits']

outter = []

for item in aim:
# 提取需要的内容
ip = item['_source']['clientip']
agent = item['_source']['agent']
host = item['_source']['host']

content = [ip,agent,host]
outter.append(content)

print(outter)
[['223.87.60.27', 'Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1', 'artifacts.elastic.co'], ['223.87.60.27', 'Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1', 'artifacts.elastic.co']]

5.导出内容为CSV

1
2
3
data = pd.DataFrame(columns = ["IP地址","访问标识","主机名"],data=outter)

data.to_csv("./out12.csv",encoding="utf_8_sig")
1