Struggling in importing wikipedia into Elasticsearch (2)

By Z.H. Fu
切问录 www.fuzihao.org

We have introduced many tools and tricks in my previous post Struggling in importing wikipedia into Elasticsearch. In that post, we successfully import Wikipedia into elasticsearch with logstash. However, things are not settled. The biggest problem of logstash is that it’s extremely user-unfriendly. Even filter an html tag will waste you half days searching google how to do modify the config file. It made very annoy sine I totally forget all the tricks of logstash. Therefore, we will explore how to use python to import the Wikipedia into elasticsearch directly.

The first step is to convert the Wikipedia source file into a more formatted one. I choose to use gensim’s Wikipedia tool to do this. It can be run like this:

1
python -m gensim.scripts.segment_wiki -i -f enwiki-20190320-pages-articles-multistream.xml.bz2 -o enwiki-20190320-pages-articles-multistream.json.gz -w 100

It will convert the latest Wikipedia dump to a .json.gz file, in which each contains a JSON dict representation a page. The detail can be obtained from the official sites of gensim.

Afterwards, we get a well-formated file enwiki-20190320-pages-articles-multistream.json.gz . We will then import this line by line into elasticsearch.

Before we begin to write our code. We should firstly set some parameters for elasticsearch to make it more capable to hold large data and query. There are two modifications:

  1. in config/elasticsearch.yml, add:
1
2
3
4
thread_pool:
bulk:
size: 32
queue_size: 50000

This will make the thread pool much larger than the default one. It makes elasticsearch capable to handle multiple queries.

  1. config/jvm.options:

change -Xmx1g to -Xmx32g to make it have a larger jvm heap size. If the jvm heap size is not large enough, the elasticsearch will throw errors.

Finally, we can read from the new Wikipedia data file and import into elasticsearch, the script is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

from multiprocessing import Process,Queue, Pool
import bz2, sys
import xmltodict
from tqdm.auto import tqdm
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from multiprocessing import Pool
import io, os
import json
import time


consumer_n = 20
buck_size = 40
wiki = "../enwiki-20190320-pages-articles-multistream.json.gz"
totoal_num = 5226713#60283085#30302#60283085

def import_es(es, lines):
pages = []
for line in lines:
page = json.loads(line)
for key in page.keys():
page[key] = json.dumps(page[key])
if 'interlinks' in page:
page['entitylinks'] = page.pop('interlinks')
page['_index'] = 'enwiki_20190320'
page['_type'] = "wiki"
pages.append(page)
res = helpers.bulk(es, pages)
#res = es.index(index=, doc_type = "wiki", body=page)

pbar = tqdm(total=totoal_num)

def procducer(q, wikifile, pbar):
begin_record = False
cnt = 0
print("processing...")
for line in os.popen("zcat %s"%wikifile):
pbar.update(max(cnt - q.qsize() - pbar.n, 0))
q.put(line)
cnt += 1
pbar.set_description("Queue Remain: %s" % q.qsize())
print("total pages: %d", cnt)

for i in range(consumer_n):
q.put(None)
while q.qsize() > 0:
time.sleep(1)
pbar.update(max(cnt - q.qsize() - pbar.n, 0))
pbar.set_description("Queue Remain: %s" % q.qsize())
print("End procducer")


def consumer(q):
es=Elasticsearch([{'host':'localhost','port':9200}], timeout=50, max_retries=10, retry_on_timeout=True)
pages = []
while True:
res=q.get()
if res == None or len(pages) >= buck_size:
import_es(es, pages)
pages = []
if res is None:
break
pages.append(res)

print("End consumer")


if __name__ == '__main__':
q=Queue()
p=Process(target=procducer,args=(q, wiki, pbar))
p.start()

cs = []
for _ in range(consumer_n):
c = Process(target=consumer,args=(q,))
c.start()
cs.append(c)


p.join()

for c in cs:
c.join()
pbar.close()

print('Finish Processing.')

In this script, we use a producer-consumer model to read and import the data into elasticsearch.